Whamcloud - gitweb
b8353c530d50603b44a58c92ee6d32b0bcf565e0
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include "osc_internal.h"
62 #include "osc_cl_internal.h"
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65 static int brw_interpret(const struct lu_env *env,
66                          struct ptlrpc_request *req, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         struct obd_import *imp = class_exp2cliimp(exp);
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
153         }
154
155         if (imp != NULL &&
156             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
157                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
158         else
159                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
264                        struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
308                        struct obd_info *oinfo, struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_setattr_args *sa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
363 out:
364         rc = sa->sa_upcall(sa->sa_cookie, rc);
365         RETURN(rc);
366 }
367
368 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369                            struct obd_trans_info *oti,
370                            obd_enqueue_update_f upcall, void *cookie,
371                            struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request   *req;
374         struct osc_setattr_args *sa;
375         int                      rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392         osc_pack_req_body(req, oinfo);
393
394         ptlrpc_request_set_replen(req);
395
396         /* do mds to ost setattr asynchronously */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400         } else {
401                 req->rq_interpret_reply =
402                         (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405                 sa = ptlrpc_req_async_args(req);
406                 sa->sa_oa = oinfo->oi_oa;
407                 sa->sa_upcall = upcall;
408                 sa->sa_cookie = cookie;
409
410                 if (rqset == PTLRPCD_SET)
411                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412                 else
413                         ptlrpc_set_add_req(rqset, req);
414         }
415
416         RETURN(0);
417 }
418
419 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420                              struct obd_trans_info *oti,
421                              struct ptlrpc_request_set *rqset)
422 {
423         return osc_setattr_async_base(exp, oinfo, oti,
424                                       oinfo->oi_cb_up, oinfo, rqset);
425 }
426
427 int osc_real_create(struct obd_export *exp, struct obdo *oa,
428                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
429 {
430         struct ptlrpc_request *req;
431         struct ost_body       *body;
432         struct lov_stripe_md  *lsm;
433         int                    rc;
434         ENTRY;
435
436         LASSERT(oa);
437         LASSERT(ea);
438
439         lsm = *ea;
440         if (!lsm) {
441                 rc = obd_alloc_memmd(exp, &lsm);
442                 if (rc < 0)
443                         RETURN(rc);
444         }
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447         if (req == NULL)
448                 GOTO(out, rc = -ENOMEM);
449
450         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 GOTO(out, rc);
454         }
455
456         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457         LASSERT(body);
458         lustre_set_wire_obdo(&body->oa, oa);
459
460         ptlrpc_request_set_replen(req);
461
462         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
463             oa->o_flags == OBD_FL_DELORPHAN) {
464                 DEBUG_REQ(D_HA, req,
465                           "delorphan from OST integration");
466                 /* Don't resend the delorphan req */
467                 req->rq_no_resend = req->rq_no_delay = 1;
468         }
469
470         rc = ptlrpc_queue_wait(req);
471         if (rc)
472                 GOTO(out_req, rc);
473
474         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
475         if (body == NULL)
476                 GOTO(out_req, rc = -EPROTO);
477
478         lustre_get_wire_obdo(oa, &body->oa);
479
480         /* This should really be sent by the OST */
481         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
482         oa->o_valid |= OBD_MD_FLBLKSZ;
483
484         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
485          * have valid lsm_oinfo data structs, so don't go touching that.
486          * This needs to be fixed in a big way.
487          */
488         lsm->lsm_object_id = oa->o_id;
489         lsm->lsm_object_seq = oa->o_seq;
490         *ea = lsm;
491
492         if (oti != NULL) {
493                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496                         if (!oti->oti_logcookies)
497                                 oti_alloc_cookies(oti, 1);
498                         *oti->oti_logcookies = oa->o_lcookie;
499                 }
500         }
501
502         CDEBUG(D_HA, "transno: "LPD64"\n",
503                lustre_msg_get_transno(req->rq_repmsg));
504 out_req:
505         ptlrpc_req_finished(req);
506 out:
507         if (rc && !*ea)
508                 obd_free_memmd(exp, &lsm);
509         RETURN(rc);
510 }
511
512 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513                    obd_enqueue_update_f upcall, void *cookie,
514                    struct ptlrpc_request_set *rqset)
515 {
516         struct ptlrpc_request   *req;
517         struct osc_setattr_args *sa;
518         struct ost_body         *body;
519         int                      rc;
520         ENTRY;
521
522         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523         if (req == NULL)
524                 RETURN(-ENOMEM);
525
526         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528         if (rc) {
529                 ptlrpc_request_free(req);
530                 RETURN(rc);
531         }
532         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533         ptlrpc_at_set_req_timeout(req);
534
535         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536         LASSERT(body);
537         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
538         osc_pack_capa(req, body, oinfo->oi_capa);
539
540         ptlrpc_request_set_replen(req);
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
557                      struct obd_info *oinfo, struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_fsync_args *fa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *fa->fa_oi->oi_oa = body->oa;
585 out:
586         rc = fa->fa_upcall(fa->fa_cookie, rc);
587         RETURN(rc);
588 }
589
590 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
591                   obd_enqueue_update_f upcall, void *cookie,
592                   struct ptlrpc_request_set *rqset)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_fsync_args *fa;
597         int                    rc;
598         ENTRY;
599
600         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
601         if (req == NULL)
602                 RETURN(-ENOMEM);
603
604         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
605         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
606         if (rc) {
607                 ptlrpc_request_free(req);
608                 RETURN(rc);
609         }
610
611         /* overload the size and blocks fields in the oa with start/end */
612         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
613         LASSERT(body);
614         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
615         osc_pack_capa(req, body, oinfo->oi_capa);
616
617         ptlrpc_request_set_replen(req);
618         req->rq_interpret_reply = osc_sync_interpret;
619
620         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
621         fa = ptlrpc_req_async_args(req);
622         fa->fa_oi = oinfo;
623         fa->fa_upcall = upcall;
624         fa->fa_cookie = cookie;
625
626         if (rqset == PTLRPCD_SET)
627                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
628         else
629                 ptlrpc_set_add_req(rqset, req);
630
631         RETURN (0);
632 }
633
634 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
635                     struct obd_info *oinfo, obd_size start, obd_size end,
636                     struct ptlrpc_request_set *set)
637 {
638         ENTRY;
639
640         if (!oinfo->oi_oa) {
641                 CDEBUG(D_INFO, "oa NULL\n");
642                 RETURN(-EINVAL);
643         }
644
645         oinfo->oi_oa->o_size = start;
646         oinfo->oi_oa->o_blocks = end;
647         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
648
649         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
650 }
651
652 /* Find and cancel locally locks matched by @mode in the resource found by
653  * @objid. Found locks are added into @cancel list. Returns the amount of
654  * locks added to @cancels list. */
655 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
656                                    cfs_list_t *cancels,
657                                    ldlm_mode_t mode, int lock_flags)
658 {
659         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
660         struct ldlm_res_id res_id;
661         struct ldlm_resource *res;
662         int count;
663         ENTRY;
664
665         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
666         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
667         if (res == NULL)
668                 RETURN(0);
669
670         LDLM_RESOURCE_ADDREF(res);
671         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
672                                            lock_flags, 0, NULL);
673         LDLM_RESOURCE_DELREF(res);
674         ldlm_resource_putref(res);
675         RETURN(count);
676 }
677
678 static int osc_destroy_interpret(const struct lu_env *env,
679                                  struct ptlrpc_request *req, void *data,
680                                  int rc)
681 {
682         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
683
684         cfs_atomic_dec(&cli->cl_destroy_in_flight);
685         cfs_waitq_signal(&cli->cl_destroy_waitq);
686         return 0;
687 }
688
689 static int osc_can_send_destroy(struct client_obd *cli)
690 {
691         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
692             cli->cl_max_rpcs_in_flight) {
693                 /* The destroy request can be sent */
694                 return 1;
695         }
696         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
697             cli->cl_max_rpcs_in_flight) {
698                 /*
699                  * The counter has been modified between the two atomic
700                  * operations.
701                  */
702                 cfs_waitq_signal(&cli->cl_destroy_waitq);
703         }
704         return 0;
705 }
706
707 /* Destroy requests can be async always on the client, and we don't even really
708  * care about the return code since the client cannot do anything at all about
709  * a destroy failure.
710  * When the MDS is unlinking a filename, it saves the file objects into a
711  * recovery llog, and these object records are cancelled when the OST reports
712  * they were destroyed and sync'd to disk (i.e. transaction committed).
713  * If the client dies, or the OST is down when the object should be destroyed,
714  * the records are not cancelled, and when the OST reconnects to the MDS next,
715  * it will retrieve the llog unlink logs and then sends the log cancellation
716  * cookies to the MDS after committing destroy transactions. */
717 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
718                        struct obdo *oa, struct lov_stripe_md *ea,
719                        struct obd_trans_info *oti, struct obd_export *md_export,
720                        void *capa)
721 {
722         struct client_obd     *cli = &exp->exp_obd->u.cli;
723         struct ptlrpc_request *req;
724         struct ost_body       *body;
725         CFS_LIST_HEAD(cancels);
726         int rc, count;
727         ENTRY;
728
729         if (!oa) {
730                 CDEBUG(D_INFO, "oa NULL\n");
731                 RETURN(-EINVAL);
732         }
733
734         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
735                                         LDLM_FL_DISCARD_DATA);
736
737         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
738         if (req == NULL) {
739                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
740                 RETURN(-ENOMEM);
741         }
742
743         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
744         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
745                                0, &cancels, count);
746         if (rc) {
747                 ptlrpc_request_free(req);
748                 RETURN(rc);
749         }
750
751         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
752         ptlrpc_at_set_req_timeout(req);
753
754         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
755                 oa->o_lcookie = *oti->oti_logcookies;
756         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
757         LASSERT(body);
758         lustre_set_wire_obdo(&body->oa, oa);
759
760         osc_pack_capa(req, body, (struct obd_capa *)capa);
761         ptlrpc_request_set_replen(req);
762
763         /* don't throttle destroy RPCs for the MDT */
764         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
765                 req->rq_interpret_reply = osc_destroy_interpret;
766                 if (!osc_can_send_destroy(cli)) {
767                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
768                                                           NULL);
769
770                         /*
771                          * Wait until the number of on-going destroy RPCs drops
772                          * under max_rpc_in_flight
773                          */
774                         l_wait_event_exclusive(cli->cl_destroy_waitq,
775                                                osc_can_send_destroy(cli), &lwi);
776                 }
777         }
778
779         /* Do not wait for response */
780         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
781         RETURN(0);
782 }
783
784 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
785                                 long writing_bytes)
786 {
787         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
788
789         LASSERT(!(oa->o_valid & bits));
790
791         oa->o_valid |= bits;
792         client_obd_list_lock(&cli->cl_loi_list_lock);
793         oa->o_dirty = cli->cl_dirty;
794         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
795                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
796                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
797                 oa->o_undirty = 0;
798         } else if (cfs_atomic_read(&obd_dirty_pages) -
799                    cfs_atomic_read(&obd_dirty_transit_pages) >
800                    obd_max_dirty_pages + 1){
801                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
802                  * not covered by a lock thus they may safely race and trip
803                  * this CERROR() unless we add in a small fudge factor (+1). */
804                 CERROR("dirty %d - %d > system dirty_max %d\n",
805                        cfs_atomic_read(&obd_dirty_pages),
806                        cfs_atomic_read(&obd_dirty_transit_pages),
807                        obd_max_dirty_pages);
808                 oa->o_undirty = 0;
809         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
810                 CERROR("dirty %lu - dirty_max %lu too big???\n",
811                        cli->cl_dirty, cli->cl_dirty_max);
812                 oa->o_undirty = 0;
813         } else {
814                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
815                                 (cli->cl_max_rpcs_in_flight + 1);
816                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
817         }
818         oa->o_grant = cli->cl_avail_grant;
819         oa->o_dropped = cli->cl_lost_grant;
820         cli->cl_lost_grant = 0;
821         client_obd_list_unlock(&cli->cl_loi_list_lock);
822         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
823                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
824
825 }
826
827 void osc_update_next_shrink(struct client_obd *cli)
828 {
829         cli->cl_next_shrink_grant =
830                 cfs_time_shift(cli->cl_grant_shrink_interval);
831         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
832                cli->cl_next_shrink_grant);
833 }
834
835 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
836 {
837         client_obd_list_lock(&cli->cl_loi_list_lock);
838         cli->cl_avail_grant += grant;
839         client_obd_list_unlock(&cli->cl_loi_list_lock);
840 }
841
842 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
843 {
844         if (body->oa.o_valid & OBD_MD_FLGRANT) {
845                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
846                 __osc_update_grant(cli, body->oa.o_grant);
847         }
848 }
849
850 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
851                               obd_count keylen, void *key, obd_count vallen,
852                               void *val, struct ptlrpc_request_set *set);
853
854 static int osc_shrink_grant_interpret(const struct lu_env *env,
855                                       struct ptlrpc_request *req,
856                                       void *aa, int rc)
857 {
858         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
859         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
860         struct ost_body *body;
861
862         if (rc != 0) {
863                 __osc_update_grant(cli, oa->o_grant);
864                 GOTO(out, rc);
865         }
866
867         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
868         LASSERT(body);
869         osc_update_grant(cli, body);
870 out:
871         OBDO_FREE(oa);
872         return rc;
873 }
874
875 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         oa->o_grant = cli->cl_avail_grant / 4;
879         cli->cl_avail_grant -= oa->o_grant;
880         client_obd_list_unlock(&cli->cl_loi_list_lock);
881         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
882                 oa->o_valid |= OBD_MD_FLFLAGS;
883                 oa->o_flags = 0;
884         }
885         oa->o_flags |= OBD_FL_SHRINK_GRANT;
886         osc_update_next_shrink(cli);
887 }
888
889 /* Shrink the current grant, either from some large amount to enough for a
890  * full set of in-flight RPCs, or if we have already shrunk to that limit
891  * then to enough for a single RPC.  This avoids keeping more grant than
892  * needed, and avoids shrinking the grant piecemeal. */
893 static int osc_shrink_grant(struct client_obd *cli)
894 {
895         long target = (cli->cl_max_rpcs_in_flight + 1) *
896                       cli->cl_max_pages_per_rpc;
897
898         client_obd_list_lock(&cli->cl_loi_list_lock);
899         if (cli->cl_avail_grant <= target)
900                 target = cli->cl_max_pages_per_rpc;
901         client_obd_list_unlock(&cli->cl_loi_list_lock);
902
903         return osc_shrink_grant_to_target(cli, target);
904 }
905
906 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
907 {
908         int    rc = 0;
909         struct ost_body     *body;
910         ENTRY;
911
912         client_obd_list_lock(&cli->cl_loi_list_lock);
913         /* Don't shrink if we are already above or below the desired limit
914          * We don't want to shrink below a single RPC, as that will negatively
915          * impact block allocation and long-term performance. */
916         if (target < cli->cl_max_pages_per_rpc)
917                 target = cli->cl_max_pages_per_rpc;
918
919         if (target >= cli->cl_avail_grant) {
920                 client_obd_list_unlock(&cli->cl_loi_list_lock);
921                 RETURN(0);
922         }
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924
925         OBD_ALLOC_PTR(body);
926         if (!body)
927                 RETURN(-ENOMEM);
928
929         osc_announce_cached(cli, &body->oa, 0);
930
931         client_obd_list_lock(&cli->cl_loi_list_lock);
932         body->oa.o_grant = cli->cl_avail_grant - target;
933         cli->cl_avail_grant = target;
934         client_obd_list_unlock(&cli->cl_loi_list_lock);
935         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
936                 body->oa.o_valid |= OBD_MD_FLFLAGS;
937                 body->oa.o_flags = 0;
938         }
939         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
940         osc_update_next_shrink(cli);
941
942         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
943                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
944                                 sizeof(*body), body, NULL);
945         if (rc != 0)
946                 __osc_update_grant(cli, body->oa.o_grant);
947         OBD_FREE_PTR(body);
948         RETURN(rc);
949 }
950
951 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
952 static int osc_should_shrink_grant(struct client_obd *client)
953 {
954         cfs_time_t time = cfs_time_current();
955         cfs_time_t next_shrink = client->cl_next_shrink_grant;
956
957         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
958              OBD_CONNECT_GRANT_SHRINK) == 0)
959                 return 0;
960
961         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
962                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
963                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
964                         return 1;
965                 else
966                         osc_update_next_shrink(client);
967         }
968         return 0;
969 }
970
971 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
972 {
973         struct client_obd *client;
974
975         cfs_list_for_each_entry(client, &item->ti_obd_list,
976                                 cl_grant_shrink_list) {
977                 if (osc_should_shrink_grant(client))
978                         osc_shrink_grant(client);
979         }
980         return 0;
981 }
982
983 static int osc_add_shrink_grant(struct client_obd *client)
984 {
985         int rc;
986
987         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
988                                        TIMEOUT_GRANT,
989                                        osc_grant_shrink_grant_cb, NULL,
990                                        &client->cl_grant_shrink_list);
991         if (rc) {
992                 CERROR("add grant client %s error %d\n",
993                         client->cl_import->imp_obd->obd_name, rc);
994                 return rc;
995         }
996         CDEBUG(D_CACHE, "add grant client %s \n",
997                client->cl_import->imp_obd->obd_name);
998         osc_update_next_shrink(client);
999         return 0;
1000 }
1001
1002 static int osc_del_shrink_grant(struct client_obd *client)
1003 {
1004         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1005                                          TIMEOUT_GRANT);
1006 }
1007
1008 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1009 {
1010         /*
1011          * ocd_grant is the total grant amount we're expect to hold: if we've
1012          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1013          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1014          *
1015          * race is tolerable here: if we're evicted, but imp_state already
1016          * left EVICTED state, then cl_dirty must be 0 already.
1017          */
1018         client_obd_list_lock(&cli->cl_loi_list_lock);
1019         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1020                 cli->cl_avail_grant = ocd->ocd_grant;
1021         else
1022                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1023
1024         if (cli->cl_avail_grant < 0) {
1025                 CWARN("%s: available grant < 0, the OSS is probably not running"
1026                       " with patch from bug20278 (%ld) \n",
1027                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1028                 /* workaround for 1.6 servers which do not have
1029                  * the patch from bug20278 */
1030                 cli->cl_avail_grant = ocd->ocd_grant;
1031         }
1032
1033         client_obd_list_unlock(&cli->cl_loi_list_lock);
1034
1035         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1036                cli->cl_import->imp_obd->obd_name,
1037                cli->cl_avail_grant, cli->cl_lost_grant);
1038
1039         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1040             cfs_list_empty(&cli->cl_grant_shrink_list))
1041                 osc_add_shrink_grant(cli);
1042 }
1043
1044 /* We assume that the reason this OSC got a short read is because it read
1045  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1046  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1047  * this stripe never got written at or beyond this stripe offset yet. */
1048 static void handle_short_read(int nob_read, obd_count page_count,
1049                               struct brw_page **pga)
1050 {
1051         char *ptr;
1052         int i = 0;
1053
1054         /* skip bytes read OK */
1055         while (nob_read > 0) {
1056                 LASSERT (page_count > 0);
1057
1058                 if (pga[i]->count > nob_read) {
1059                         /* EOF inside this page */
1060                         ptr = cfs_kmap(pga[i]->pg) +
1061                                 (pga[i]->off & ~CFS_PAGE_MASK);
1062                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1063                         cfs_kunmap(pga[i]->pg);
1064                         page_count--;
1065                         i++;
1066                         break;
1067                 }
1068
1069                 nob_read -= pga[i]->count;
1070                 page_count--;
1071                 i++;
1072         }
1073
1074         /* zero remaining pages */
1075         while (page_count-- > 0) {
1076                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1077                 memset(ptr, 0, pga[i]->count);
1078                 cfs_kunmap(pga[i]->pg);
1079                 i++;
1080         }
1081 }
1082
1083 static int check_write_rcs(struct ptlrpc_request *req,
1084                            int requested_nob, int niocount,
1085                            obd_count page_count, struct brw_page **pga)
1086 {
1087         int     i;
1088         __u32   *remote_rcs;
1089
1090         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1091                                                   sizeof(*remote_rcs) *
1092                                                   niocount);
1093         if (remote_rcs == NULL) {
1094                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1095                 return(-EPROTO);
1096         }
1097
1098         /* return error if any niobuf was in error */
1099         for (i = 0; i < niocount; i++) {
1100                 if ((int)remote_rcs[i] < 0)
1101                         return(remote_rcs[i]);
1102
1103                 if (remote_rcs[i] != 0) {
1104                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1105                                 i, remote_rcs[i], req);
1106                         return(-EPROTO);
1107                 }
1108         }
1109
1110         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1111                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1112                        req->rq_bulk->bd_nob_transferred, requested_nob);
1113                 return(-EPROTO);
1114         }
1115
1116         return (0);
1117 }
1118
1119 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1120 {
1121         if (p1->flag != p2->flag) {
1122                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1123                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1124
1125                 /* warn if we try to combine flags that we don't know to be
1126                  * safe to combine */
1127                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1128                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1129                               "report this at http://bugs.whamcloud.com/\n",
1130                               p1->flag, p2->flag);
1131                 }
1132                 return 0;
1133         }
1134
1135         return (p1->off + p1->count == p2->off);
1136 }
1137
1138 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1139                                    struct brw_page **pga, int opc,
1140                                    cksum_type_t cksum_type)
1141 {
1142         __u32                           cksum;
1143         int                             i = 0;
1144         struct cfs_crypto_hash_desc     *hdesc;
1145         unsigned int                    bufsize;
1146         int                             err;
1147         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1148
1149         LASSERT(pg_count > 0);
1150
1151         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1152         if (IS_ERR(hdesc)) {
1153                 CERROR("Unable to initialize checksum hash %s\n",
1154                        cfs_crypto_hash_name(cfs_alg));
1155                 return PTR_ERR(hdesc);
1156         }
1157
1158         while (nob > 0 && pg_count > 0) {
1159                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1160
1161                 /* corrupt the data before we compute the checksum, to
1162                  * simulate an OST->client data error */
1163                 if (i == 0 && opc == OST_READ &&
1164                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1165                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1166                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1167                         memcpy(ptr + off, "bad1", min(4, nob));
1168                         cfs_kunmap(pga[i]->pg);
1169                 }
1170                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1171                                   pga[i]->off & ~CFS_PAGE_MASK,
1172                                   count);
1173                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1174                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1175
1176                 nob -= pga[i]->count;
1177                 pg_count--;
1178                 i++;
1179         }
1180
1181         bufsize = 4;
1182         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1183
1184         if (err)
1185                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1186
1187         /* For sending we only compute the wrong checksum instead
1188          * of corrupting the data so it is still correct on a redo */
1189         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1190                 cksum++;
1191
1192         return cksum;
1193 }
1194
1195 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1196                                 struct lov_stripe_md *lsm, obd_count page_count,
1197                                 struct brw_page **pga,
1198                                 struct ptlrpc_request **reqp,
1199                                 struct obd_capa *ocapa, int reserve,
1200                                 int resend)
1201 {
1202         struct ptlrpc_request   *req;
1203         struct ptlrpc_bulk_desc *desc;
1204         struct ost_body         *body;
1205         struct obd_ioobj        *ioobj;
1206         struct niobuf_remote    *niobuf;
1207         int niocount, i, requested_nob, opc, rc;
1208         struct osc_brw_async_args *aa;
1209         struct req_capsule      *pill;
1210         struct brw_page *pg_prev;
1211
1212         ENTRY;
1213         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1214                 RETURN(-ENOMEM); /* Recoverable */
1215         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1216                 RETURN(-EINVAL); /* Fatal */
1217
1218         if ((cmd & OBD_BRW_WRITE) != 0) {
1219                 opc = OST_WRITE;
1220                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1221                                                 cli->cl_import->imp_rq_pool,
1222                                                 &RQF_OST_BRW_WRITE);
1223         } else {
1224                 opc = OST_READ;
1225                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1226         }
1227         if (req == NULL)
1228                 RETURN(-ENOMEM);
1229
1230         for (niocount = i = 1; i < page_count; i++) {
1231                 if (!can_merge_pages(pga[i - 1], pga[i]))
1232                         niocount++;
1233         }
1234
1235         pill = &req->rq_pill;
1236         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1237                              sizeof(*ioobj));
1238         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1239                              niocount * sizeof(*niobuf));
1240         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1241
1242         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1243         if (rc) {
1244                 ptlrpc_request_free(req);
1245                 RETURN(rc);
1246         }
1247         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1248         ptlrpc_at_set_req_timeout(req);
1249
1250         if (opc == OST_WRITE)
1251                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1252                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1253         else
1254                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1255                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1256
1257         if (desc == NULL)
1258                 GOTO(out, rc = -ENOMEM);
1259         /* NB request now owns desc and will free it when it gets freed */
1260
1261         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1262         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1263         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1264         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1265
1266         lustre_set_wire_obdo(&body->oa, oa);
1267
1268         obdo_to_ioobj(oa, ioobj);
1269         ioobj->ioo_bufcnt = niocount;
1270         osc_pack_capa(req, body, ocapa);
1271         LASSERT (page_count > 0);
1272         pg_prev = pga[0];
1273         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1274                 struct brw_page *pg = pga[i];
1275                 int poff = pg->off & ~CFS_PAGE_MASK;
1276
1277                 LASSERT(pg->count > 0);
1278                 /* make sure there is no gap in the middle of page array */
1279                 LASSERTF(page_count == 1 ||
1280                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1281                           ergo(i > 0 && i < page_count - 1,
1282                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1283                           ergo(i == page_count - 1, poff == 0)),
1284                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1285                          i, page_count, pg, pg->off, pg->count);
1286 #ifdef __linux__
1287                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1288                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1289                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1290                          i, page_count,
1291                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1292                          pg_prev->pg, page_private(pg_prev->pg),
1293                          pg_prev->pg->index, pg_prev->off);
1294 #else
1295                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1296                          "i %d p_c %u\n", i, page_count);
1297 #endif
1298                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1299                         (pg->flag & OBD_BRW_SRVLOCK));
1300
1301                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1302                 requested_nob += pg->count;
1303
1304                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1305                         niobuf--;
1306                         niobuf->len += pg->count;
1307                 } else {
1308                         niobuf->offset = pg->off;
1309                         niobuf->len    = pg->count;
1310                         niobuf->flags  = pg->flag;
1311                 }
1312                 pg_prev = pg;
1313         }
1314
1315         LASSERTF((void *)(niobuf - niocount) ==
1316                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1317                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1318                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1319
1320         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1321         if (resend) {
1322                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1323                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1324                         body->oa.o_flags = 0;
1325                 }
1326                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1327         }
1328
1329         if (osc_should_shrink_grant(cli))
1330                 osc_shrink_grant_local(cli, &body->oa);
1331
1332         /* size[REQ_REC_OFF] still sizeof (*body) */
1333         if (opc == OST_WRITE) {
1334                 if (cli->cl_checksum &&
1335                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1336                         /* store cl_cksum_type in a local variable since
1337                          * it can be changed via lprocfs */
1338                         cksum_type_t cksum_type = cli->cl_cksum_type;
1339
1340                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1341                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1342                                 body->oa.o_flags = 0;
1343                         }
1344                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1345                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1346                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1347                                                              page_count, pga,
1348                                                              OST_WRITE,
1349                                                              cksum_type);
1350                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1351                                body->oa.o_cksum);
1352                         /* save this in 'oa', too, for later checking */
1353                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1354                         oa->o_flags |= cksum_type_pack(cksum_type);
1355                 } else {
1356                         /* clear out the checksum flag, in case this is a
1357                          * resend but cl_checksum is no longer set. b=11238 */
1358                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1359                 }
1360                 oa->o_cksum = body->oa.o_cksum;
1361                 /* 1 RC per niobuf */
1362                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1363                                      sizeof(__u32) * niocount);
1364         } else {
1365                 if (cli->cl_checksum &&
1366                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1367                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1368                                 body->oa.o_flags = 0;
1369                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1370                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1371                 }
1372         }
1373         ptlrpc_request_set_replen(req);
1374
1375         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1376         aa = ptlrpc_req_async_args(req);
1377         aa->aa_oa = oa;
1378         aa->aa_requested_nob = requested_nob;
1379         aa->aa_nio_count = niocount;
1380         aa->aa_page_count = page_count;
1381         aa->aa_resends = 0;
1382         aa->aa_ppga = pga;
1383         aa->aa_cli = cli;
1384         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1385         if (ocapa && reserve)
1386                 aa->aa_ocapa = capa_get(ocapa);
1387
1388         *reqp = req;
1389         RETURN(0);
1390
1391  out:
1392         ptlrpc_req_finished(req);
1393         RETURN(rc);
1394 }
1395
1396 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1397                                 __u32 client_cksum, __u32 server_cksum, int nob,
1398                                 obd_count page_count, struct brw_page **pga,
1399                                 cksum_type_t client_cksum_type)
1400 {
1401         __u32 new_cksum;
1402         char *msg;
1403         cksum_type_t cksum_type;
1404
1405         if (server_cksum == client_cksum) {
1406                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1407                 return 0;
1408         }
1409
1410         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1411                                        oa->o_flags : 0);
1412         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1413                                       cksum_type);
1414
1415         if (cksum_type != client_cksum_type)
1416                 msg = "the server did not use the checksum type specified in "
1417                       "the original request - likely a protocol problem";
1418         else if (new_cksum == server_cksum)
1419                 msg = "changed on the client after we checksummed it - "
1420                       "likely false positive due to mmap IO (bug 11742)";
1421         else if (new_cksum == client_cksum)
1422                 msg = "changed in transit before arrival at OST";
1423         else
1424                 msg = "changed in transit AND doesn't match the original - "
1425                       "likely false positive due to mmap IO (bug 11742)";
1426
1427         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1428                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1429                            msg, libcfs_nid2str(peer->nid),
1430                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1431                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1432                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1433                            oa->o_id,
1434                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1435                            pga[0]->off,
1436                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1437         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1438                "client csum now %x\n", client_cksum, client_cksum_type,
1439                server_cksum, cksum_type, new_cksum);
1440         return 1;
1441 }
1442
1443 /* Note rc enters this function as number of bytes transferred */
1444 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1445 {
1446         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1447         const lnet_process_id_t *peer =
1448                         &req->rq_import->imp_connection->c_peer;
1449         struct client_obd *cli = aa->aa_cli;
1450         struct ost_body *body;
1451         __u32 client_cksum = 0;
1452         ENTRY;
1453
1454         if (rc < 0 && rc != -EDQUOT) {
1455                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1456                 RETURN(rc);
1457         }
1458
1459         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1460         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1461         if (body == NULL) {
1462                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1463                 RETURN(-EPROTO);
1464         }
1465
1466         /* set/clear over quota flag for a uid/gid */
1467         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1468             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1469                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1470
1471                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1472                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1473                        body->oa.o_flags);
1474                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1475         }
1476
1477         osc_update_grant(cli, body);
1478
1479         if (rc < 0)
1480                 RETURN(rc);
1481
1482         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1483                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1484
1485         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1486                 if (rc > 0) {
1487                         CERROR("Unexpected +ve rc %d\n", rc);
1488                         RETURN(-EPROTO);
1489                 }
1490                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1491
1492                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1493                         RETURN(-EAGAIN);
1494
1495                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1496                     check_write_checksum(&body->oa, peer, client_cksum,
1497                                          body->oa.o_cksum, aa->aa_requested_nob,
1498                                          aa->aa_page_count, aa->aa_ppga,
1499                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1500                         RETURN(-EAGAIN);
1501
1502                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1503                                      aa->aa_page_count, aa->aa_ppga);
1504                 GOTO(out, rc);
1505         }
1506
1507         /* The rest of this function executes only for OST_READs */
1508
1509         /* if unwrap_bulk failed, return -EAGAIN to retry */
1510         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1511         if (rc < 0)
1512                 GOTO(out, rc = -EAGAIN);
1513
1514         if (rc > aa->aa_requested_nob) {
1515                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1516                        aa->aa_requested_nob);
1517                 RETURN(-EPROTO);
1518         }
1519
1520         if (rc != req->rq_bulk->bd_nob_transferred) {
1521                 CERROR ("Unexpected rc %d (%d transferred)\n",
1522                         rc, req->rq_bulk->bd_nob_transferred);
1523                 return (-EPROTO);
1524         }
1525
1526         if (rc < aa->aa_requested_nob)
1527                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1528
1529         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1530                 static int cksum_counter;
1531                 __u32      server_cksum = body->oa.o_cksum;
1532                 char      *via;
1533                 char      *router;
1534                 cksum_type_t cksum_type;
1535
1536                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1537                                                body->oa.o_flags : 0);
1538                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1539                                                  aa->aa_ppga, OST_READ,
1540                                                  cksum_type);
1541
1542                 if (peer->nid == req->rq_bulk->bd_sender) {
1543                         via = router = "";
1544                 } else {
1545                         via = " via ";
1546                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1547                 }
1548
1549                 if (server_cksum == ~0 && rc > 0) {
1550                         CERROR("Protocol error: server %s set the 'checksum' "
1551                                "bit, but didn't send a checksum.  Not fatal, "
1552                                "but please notify on http://bugs.whamcloud.com/\n",
1553                                libcfs_nid2str(peer->nid));
1554                 } else if (server_cksum != client_cksum) {
1555                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1556                                            "%s%s%s inode "DFID" object "
1557                                            LPU64"/"LPU64" extent "
1558                                            "["LPU64"-"LPU64"]\n",
1559                                            req->rq_import->imp_obd->obd_name,
1560                                            libcfs_nid2str(peer->nid),
1561                                            via, router,
1562                                            body->oa.o_valid & OBD_MD_FLFID ?
1563                                                 body->oa.o_parent_seq : (__u64)0,
1564                                            body->oa.o_valid & OBD_MD_FLFID ?
1565                                                 body->oa.o_parent_oid : 0,
1566                                            body->oa.o_valid & OBD_MD_FLFID ?
1567                                                 body->oa.o_parent_ver : 0,
1568                                            body->oa.o_id,
1569                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1570                                                 body->oa.o_seq : (__u64)0,
1571                                            aa->aa_ppga[0]->off,
1572                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1573                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1574                                                                         1);
1575                         CERROR("client %x, server %x, cksum_type %x\n",
1576                                client_cksum, server_cksum, cksum_type);
1577                         cksum_counter = 0;
1578                         aa->aa_oa->o_cksum = client_cksum;
1579                         rc = -EAGAIN;
1580                 } else {
1581                         cksum_counter++;
1582                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1583                         rc = 0;
1584                 }
1585         } else if (unlikely(client_cksum)) {
1586                 static int cksum_missed;
1587
1588                 cksum_missed++;
1589                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1590                         CERROR("Checksum %u requested from %s but not sent\n",
1591                                cksum_missed, libcfs_nid2str(peer->nid));
1592         } else {
1593                 rc = 0;
1594         }
1595 out:
1596         if (rc >= 0)
1597                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1598
1599         RETURN(rc);
1600 }
1601
1602 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1603                             struct lov_stripe_md *lsm,
1604                             obd_count page_count, struct brw_page **pga,
1605                             struct obd_capa *ocapa)
1606 {
1607         struct ptlrpc_request *req;
1608         int                    rc;
1609         cfs_waitq_t            waitq;
1610         int                    generation, resends = 0;
1611         struct l_wait_info     lwi;
1612
1613         ENTRY;
1614
1615         cfs_waitq_init(&waitq);
1616         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1617
1618 restart_bulk:
1619         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1620                                   page_count, pga, &req, ocapa, 0, resends);
1621         if (rc != 0)
1622                 return (rc);
1623
1624         if (resends) {
1625                 req->rq_generation_set = 1;
1626                 req->rq_import_generation = generation;
1627                 req->rq_sent = cfs_time_current_sec() + resends;
1628         }
1629
1630         rc = ptlrpc_queue_wait(req);
1631
1632         if (rc == -ETIMEDOUT && req->rq_resend) {
1633                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1634                 ptlrpc_req_finished(req);
1635                 goto restart_bulk;
1636         }
1637
1638         rc = osc_brw_fini_request(req, rc);
1639
1640         ptlrpc_req_finished(req);
1641         /* When server return -EINPROGRESS, client should always retry
1642          * regardless of the number of times the bulk was resent already.*/
1643         if (osc_recoverable_error(rc)) {
1644                 resends++;
1645                 if (rc != -EINPROGRESS &&
1646                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1647                         CERROR("%s: too many resend retries for object: "
1648                                ""LPU64":"LPU64", rc = %d.\n",
1649                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1650                         goto out;
1651                 }
1652                 if (generation !=
1653                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1654                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1655                                ""LPU64":"LPU64", rc = %d.\n",
1656                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1657                         goto out;
1658                 }
1659
1660                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1661                                        NULL);
1662                 l_wait_event(waitq, 0, &lwi);
1663
1664                 goto restart_bulk;
1665         }
1666 out:
1667         if (rc == -EAGAIN || rc == -EINPROGRESS)
1668                 rc = -EIO;
1669         RETURN (rc);
1670 }
1671
1672 int osc_brw_redo_request(struct ptlrpc_request *request,
1673                          struct osc_brw_async_args *aa)
1674 {
1675         struct ptlrpc_request *new_req;
1676         struct ptlrpc_request_set *set = request->rq_set;
1677         struct osc_brw_async_args *new_aa;
1678         struct osc_async_page *oap;
1679         int rc = 0;
1680         ENTRY;
1681
1682         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1683
1684         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1685                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1686                                   aa->aa_cli, aa->aa_oa,
1687                                   NULL /* lsm unused by osc currently */,
1688                                   aa->aa_page_count, aa->aa_ppga,
1689                                   &new_req, aa->aa_ocapa, 0, 1);
1690         if (rc)
1691                 RETURN(rc);
1692
1693         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1694
1695         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1696                 if (oap->oap_request != NULL) {
1697                         LASSERTF(request == oap->oap_request,
1698                                  "request %p != oap_request %p\n",
1699                                  request, oap->oap_request);
1700                         if (oap->oap_interrupted) {
1701                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1702                                 ptlrpc_req_finished(new_req);
1703                                 RETURN(-EINTR);
1704                         }
1705                 }
1706         }
1707         /* New request takes over pga and oaps from old request.
1708          * Note that copying a list_head doesn't work, need to move it... */
1709         aa->aa_resends++;
1710         new_req->rq_interpret_reply = request->rq_interpret_reply;
1711         new_req->rq_async_args = request->rq_async_args;
1712         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1713         new_req->rq_generation_set = 1;
1714         new_req->rq_import_generation = request->rq_import_generation;
1715
1716         new_aa = ptlrpc_req_async_args(new_req);
1717
1718         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1719         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1720         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1721
1722         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1723                 if (oap->oap_request) {
1724                         ptlrpc_req_finished(oap->oap_request);
1725                         oap->oap_request = ptlrpc_request_addref(new_req);
1726                 }
1727         }
1728
1729         new_aa->aa_ocapa = aa->aa_ocapa;
1730         aa->aa_ocapa = NULL;
1731
1732         /* use ptlrpc_set_add_req is safe because interpret functions work
1733          * in check_set context. only one way exist with access to request
1734          * from different thread got -EINTR - this way protected with
1735          * cl_loi_list_lock */
1736         ptlrpc_set_add_req(set, new_req);
1737
1738         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1739
1740         DEBUG_REQ(D_INFO, new_req, "new request");
1741         RETURN(0);
1742 }
1743
1744 /*
1745  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1746  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1747  * fine for our small page arrays and doesn't require allocation.  its an
1748  * insertion sort that swaps elements that are strides apart, shrinking the
1749  * stride down until its '1' and the array is sorted.
1750  */
1751 static void sort_brw_pages(struct brw_page **array, int num)
1752 {
1753         int stride, i, j;
1754         struct brw_page *tmp;
1755
1756         if (num == 1)
1757                 return;
1758         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1759                 ;
1760
1761         do {
1762                 stride /= 3;
1763                 for (i = stride ; i < num ; i++) {
1764                         tmp = array[i];
1765                         j = i;
1766                         while (j >= stride && array[j - stride]->off > tmp->off) {
1767                                 array[j] = array[j - stride];
1768                                 j -= stride;
1769                         }
1770                         array[j] = tmp;
1771                 }
1772         } while (stride > 1);
1773 }
1774
1775 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1776 {
1777         int count = 1;
1778         int offset;
1779         int i = 0;
1780
1781         LASSERT (pages > 0);
1782         offset = pg[i]->off & ~CFS_PAGE_MASK;
1783
1784         for (;;) {
1785                 pages--;
1786                 if (pages == 0)         /* that's all */
1787                         return count;
1788
1789                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1790                         return count;   /* doesn't end on page boundary */
1791
1792                 i++;
1793                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1794                 if (offset != 0)        /* doesn't start on page boundary */
1795                         return count;
1796
1797                 count++;
1798         }
1799 }
1800
1801 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1802 {
1803         struct brw_page **ppga;
1804         int i;
1805
1806         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1807         if (ppga == NULL)
1808                 return NULL;
1809
1810         for (i = 0; i < count; i++)
1811                 ppga[i] = pga + i;
1812         return ppga;
1813 }
1814
1815 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1816 {
1817         LASSERT(ppga != NULL);
1818         OBD_FREE(ppga, sizeof(*ppga) * count);
1819 }
1820
1821 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1822                    obd_count page_count, struct brw_page *pga,
1823                    struct obd_trans_info *oti)
1824 {
1825         struct obdo *saved_oa = NULL;
1826         struct brw_page **ppga, **orig;
1827         struct obd_import *imp = class_exp2cliimp(exp);
1828         struct client_obd *cli;
1829         int rc, page_count_orig;
1830         ENTRY;
1831
1832         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1833         cli = &imp->imp_obd->u.cli;
1834
1835         if (cmd & OBD_BRW_CHECK) {
1836                 /* The caller just wants to know if there's a chance that this
1837                  * I/O can succeed */
1838
1839                 if (imp->imp_invalid)
1840                         RETURN(-EIO);
1841                 RETURN(0);
1842         }
1843
1844         /* test_brw with a failed create can trip this, maybe others. */
1845         LASSERT(cli->cl_max_pages_per_rpc);
1846
1847         rc = 0;
1848
1849         orig = ppga = osc_build_ppga(pga, page_count);
1850         if (ppga == NULL)
1851                 RETURN(-ENOMEM);
1852         page_count_orig = page_count;
1853
1854         sort_brw_pages(ppga, page_count);
1855         while (page_count) {
1856                 obd_count pages_per_brw;
1857
1858                 if (page_count > cli->cl_max_pages_per_rpc)
1859                         pages_per_brw = cli->cl_max_pages_per_rpc;
1860                 else
1861                         pages_per_brw = page_count;
1862
1863                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1864
1865                 if (saved_oa != NULL) {
1866                         /* restore previously saved oa */
1867                         *oinfo->oi_oa = *saved_oa;
1868                 } else if (page_count > pages_per_brw) {
1869                         /* save a copy of oa (brw will clobber it) */
1870                         OBDO_ALLOC(saved_oa);
1871                         if (saved_oa == NULL)
1872                                 GOTO(out, rc = -ENOMEM);
1873                         *saved_oa = *oinfo->oi_oa;
1874                 }
1875
1876                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1877                                       pages_per_brw, ppga, oinfo->oi_capa);
1878
1879                 if (rc != 0)
1880                         break;
1881
1882                 page_count -= pages_per_brw;
1883                 ppga += pages_per_brw;
1884         }
1885
1886 out:
1887         osc_release_ppga(orig, page_count_orig);
1888
1889         if (saved_oa != NULL)
1890                 OBDO_FREE(saved_oa);
1891
1892         RETURN(rc);
1893 }
1894
1895 static int brw_interpret(const struct lu_env *env,
1896                          struct ptlrpc_request *req, void *data, int rc)
1897 {
1898         struct osc_brw_async_args *aa = data;
1899         struct osc_async_page *oap, *tmp;
1900         struct client_obd *cli;
1901         ENTRY;
1902
1903         rc = osc_brw_fini_request(req, rc);
1904         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1905         /* When server return -EINPROGRESS, client should always retry
1906          * regardless of the number of times the bulk was resent already. */
1907         if (osc_recoverable_error(rc)) {
1908                 if (req->rq_import_generation !=
1909                     req->rq_import->imp_generation) {
1910                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1911                                ""LPU64":"LPU64", rc = %d.\n",
1912                                req->rq_import->imp_obd->obd_name,
1913                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1914                 } else if (rc == -EINPROGRESS ||
1915                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1916                         rc = osc_brw_redo_request(req, aa);
1917                 } else {
1918                         CERROR("%s: too many resent retries for object: "
1919                                ""LPU64":"LPU64", rc = %d.\n",
1920                                req->rq_import->imp_obd->obd_name,
1921                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1922                 }
1923
1924                 if (rc == 0)
1925                         RETURN(0);
1926                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1927                         rc = -EIO;
1928         }
1929
1930         if (aa->aa_ocapa) {
1931                 capa_put(aa->aa_ocapa);
1932                 aa->aa_ocapa = NULL;
1933         }
1934
1935         cli = aa->aa_cli;
1936         client_obd_list_lock(&cli->cl_loi_list_lock);
1937
1938         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1939          * is called so we know whether to go to sync BRWs or wait for more
1940          * RPCs to complete */
1941         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1942                 cli->cl_w_in_flight--;
1943         else
1944                 cli->cl_r_in_flight--;
1945
1946         /* the caller may re-use the oap after the completion call so
1947          * we need to clean it up a little */
1948         cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
1949                         oap_rpc_item) {
1950                 cfs_list_del_init(&oap->oap_rpc_item);
1951                 osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1952         }
1953         OBDO_FREE(aa->aa_oa);
1954
1955         osc_wake_cache_waiters(cli);
1956         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1957         client_obd_list_unlock(&cli->cl_loi_list_lock);
1958
1959         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1960                           req->rq_bulk->bd_nob_transferred);
1961         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1962         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1963
1964         RETURN(rc);
1965 }
1966
1967 /* The most tricky part of this function is that it will return with
1968  * cli->cli_loi_list_lock held.
1969  */
1970 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1971                   cfs_list_t *rpc_list, int page_count, int cmd,
1972                   pdl_policy_t pol)
1973 {
1974         struct ptlrpc_request *req = NULL;
1975         struct brw_page **pga = NULL;
1976         struct osc_brw_async_args *aa = NULL;
1977         struct obdo *oa = NULL;
1978         struct osc_async_page *oap;
1979         struct osc_async_page *tmp;
1980         struct cl_req *clerq = NULL;
1981         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1982         struct ldlm_lock *lock = NULL;
1983         struct cl_req_attr crattr;
1984         int i, rc, mpflag = 0;
1985
1986         ENTRY;
1987         LASSERT(!cfs_list_empty(rpc_list));
1988
1989         if (cmd & OBD_BRW_MEMALLOC)
1990                 mpflag = cfs_memory_pressure_get_and_set();
1991
1992         memset(&crattr, 0, sizeof crattr);
1993         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1994         if (pga == NULL)
1995                 GOTO(out, rc = -ENOMEM);
1996
1997         OBDO_ALLOC(oa);
1998         if (oa == NULL)
1999                 GOTO(out, rc = -ENOMEM);
2000
2001         i = 0;
2002         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2003                 struct cl_page *page = osc_oap2cl_page(oap);
2004                 if (clerq == NULL) {
2005                         clerq = cl_req_alloc(env, page, crt,
2006                                              1 /* only 1-object rpcs for
2007                                                 * now */);
2008                         if (IS_ERR(clerq))
2009                                 GOTO(out, rc = PTR_ERR(clerq));
2010                         lock = oap->oap_ldlm_lock;
2011                 }
2012                 pga[i] = &oap->oap_brw_page;
2013                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2014                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2015                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2016                 i++;
2017                 cl_req_page_add(env, clerq, page);
2018         }
2019
2020         /* always get the data for the obdo for the rpc */
2021         LASSERT(clerq != NULL);
2022         crattr.cra_oa = oa;
2023         crattr.cra_capa = NULL;
2024         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2025         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2026         if (lock) {
2027                 oa->o_handle = lock->l_remote_handle;
2028                 oa->o_valid |= OBD_MD_FLHANDLE;
2029         }
2030
2031         rc = cl_req_prep(env, clerq);
2032         if (rc != 0) {
2033                 CERROR("cl_req_prep failed: %d\n", rc);
2034                 GOTO(out, rc);
2035         }
2036
2037         sort_brw_pages(pga, page_count);
2038         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2039                                   pga, &req, crattr.cra_capa, 1, 0);
2040         if (rc != 0) {
2041                 CERROR("prep_req failed: %d\n", rc);
2042                 GOTO(out, rc);
2043         }
2044
2045         req->rq_interpret_reply = brw_interpret;
2046         if (cmd & OBD_BRW_MEMALLOC)
2047                 req->rq_memalloc = 1;
2048
2049         /* Need to update the timestamps after the request is built in case
2050          * we race with setattr (locally or in queue at OST).  If OST gets
2051          * later setattr before earlier BRW (as determined by the request xid),
2052          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2053          * way to do this in a single call.  bug 10150 */
2054         cl_req_attr_set(env, clerq, &crattr,
2055                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2056
2057         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2058
2059         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2060         aa = ptlrpc_req_async_args(req);
2061         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2062         cfs_list_splice(rpc_list, &aa->aa_oaps);
2063         CFS_INIT_LIST_HEAD(rpc_list);
2064         aa->aa_clerq = clerq;
2065 out:
2066         if (cmd & OBD_BRW_MEMALLOC)
2067                 cfs_memory_pressure_restore(mpflag);
2068
2069         capa_put(crattr.cra_capa);
2070         if (rc != 0) {
2071                 LASSERT(req == NULL);
2072
2073                 if (oa)
2074                         OBDO_FREE(oa);
2075                 if (pga)
2076                         OBD_FREE(pga, sizeof(*pga) * page_count);
2077                 /* this should happen rarely and is pretty bad, it makes the
2078                  * pending list not follow the dirty order */
2079                 client_obd_list_lock(&cli->cl_loi_list_lock);
2080                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2081                         cfs_list_del_init(&oap->oap_rpc_item);
2082
2083                         /* queued sync pages can be torn down while the pages
2084                          * were between the pending list and the rpc */
2085                         if (oap->oap_interrupted) {
2086                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2087                                 osc_ap_completion(env, cli, NULL, oap, 0,
2088                                                   oap->oap_count);
2089                                 continue;
2090                         }
2091                         osc_ap_completion(env, cli, NULL, oap, 0, rc);
2092                 }
2093                 if (clerq && !IS_ERR(clerq))
2094                         cl_req_completion(env, clerq, rc);
2095         } else {
2096                 struct osc_async_page *tmp = NULL;
2097
2098                 /* queued sync pages can be torn down while the pages
2099                  * were between the pending list and the rpc */
2100                 LASSERT(aa != NULL);
2101                 client_obd_list_lock(&cli->cl_loi_list_lock);
2102                 cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2103                         /* only one oap gets a request reference */
2104                         if (tmp == NULL)
2105                                 tmp = oap;
2106                         if (oap->oap_interrupted && !req->rq_intr) {
2107                                 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2108                                                 oap, req);
2109                                 ptlrpc_mark_interrupted(req);
2110                         }
2111                 }
2112                 if (tmp != NULL)
2113                         tmp->oap_request = ptlrpc_request_addref(req);
2114
2115                 DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
2116                           page_count, aa, cli->cl_r_in_flight,
2117                           cli->cl_w_in_flight);
2118
2119                 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2120                  * see which CPU/NUMA node the majority of pages were allocated
2121                  * on, and try to assign the async RPC to the CPU core
2122                  * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2123                  *
2124                  * But on the other hand, we expect that multiple ptlrpcd
2125                  * threads and the initial write sponsor can run in parallel,
2126                  * especially when data checksum is enabled, which is CPU-bound
2127                  * operation and single ptlrpcd thread cannot process in time.
2128                  * So more ptlrpcd threads sharing BRW load
2129                  * (with PDL_POLICY_ROUND) seems better.
2130                  */
2131                 ptlrpcd_add_req(req, pol, -1);
2132         }
2133         RETURN(rc);
2134 }
2135
2136 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2137                                         struct ldlm_enqueue_info *einfo)
2138 {
2139         void *data = einfo->ei_cbdata;
2140         int set = 0;
2141
2142         LASSERT(lock != NULL);
2143         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2144         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2145         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2146         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2147
2148         lock_res_and_lock(lock);
2149         cfs_spin_lock(&osc_ast_guard);
2150
2151         if (lock->l_ast_data == NULL)
2152                 lock->l_ast_data = data;
2153         if (lock->l_ast_data == data)
2154                 set = 1;
2155
2156         cfs_spin_unlock(&osc_ast_guard);
2157         unlock_res_and_lock(lock);
2158
2159         return set;
2160 }
2161
2162 static int osc_set_data_with_check(struct lustre_handle *lockh,
2163                                    struct ldlm_enqueue_info *einfo)
2164 {
2165         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2166         int set = 0;
2167
2168         if (lock != NULL) {
2169                 set = osc_set_lock_data_with_check(lock, einfo);
2170                 LDLM_LOCK_PUT(lock);
2171         } else
2172                 CERROR("lockh %p, data %p - client evicted?\n",
2173                        lockh, einfo->ei_cbdata);
2174         return set;
2175 }
2176
2177 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2178                              ldlm_iterator_t replace, void *data)
2179 {
2180         struct ldlm_res_id res_id;
2181         struct obd_device *obd = class_exp2obd(exp);
2182
2183         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2184         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2185         return 0;
2186 }
2187
2188 /* find any ldlm lock of the inode in osc
2189  * return 0    not find
2190  *        1    find one
2191  *      < 0    error */
2192 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2193                            ldlm_iterator_t replace, void *data)
2194 {
2195         struct ldlm_res_id res_id;
2196         struct obd_device *obd = class_exp2obd(exp);
2197         int rc = 0;
2198
2199         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
2200         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2201         if (rc == LDLM_ITER_STOP)
2202                 return(1);
2203         if (rc == LDLM_ITER_CONTINUE)
2204                 return(0);
2205         return(rc);
2206 }
2207
2208 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2209                             obd_enqueue_update_f upcall, void *cookie,
2210                             int *flags, int agl, int rc)
2211 {
2212         int intent = *flags & LDLM_FL_HAS_INTENT;
2213         ENTRY;
2214
2215         if (intent) {
2216                 /* The request was created before ldlm_cli_enqueue call. */
2217                 if (rc == ELDLM_LOCK_ABORTED) {
2218                         struct ldlm_reply *rep;
2219                         rep = req_capsule_server_get(&req->rq_pill,
2220                                                      &RMF_DLM_REP);
2221
2222                         LASSERT(rep != NULL);
2223                         if (rep->lock_policy_res1)
2224                                 rc = rep->lock_policy_res1;
2225                 }
2226         }
2227
2228         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2229             (rc == 0)) {
2230                 *flags |= LDLM_FL_LVB_READY;
2231                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2232                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2233         }
2234
2235         /* Call the update callback. */
2236         rc = (*upcall)(cookie, rc);
2237         RETURN(rc);
2238 }
2239
2240 static int osc_enqueue_interpret(const struct lu_env *env,
2241                                  struct ptlrpc_request *req,
2242                                  struct osc_enqueue_args *aa, int rc)
2243 {
2244         struct ldlm_lock *lock;
2245         struct lustre_handle handle;
2246         __u32 mode;
2247         struct ost_lvb *lvb;
2248         __u32 lvb_len;
2249         int *flags = aa->oa_flags;
2250
2251         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2252          * might be freed anytime after lock upcall has been called. */
2253         lustre_handle_copy(&handle, aa->oa_lockh);
2254         mode = aa->oa_ei->ei_mode;
2255
2256         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2257          * be valid. */
2258         lock = ldlm_handle2lock(&handle);
2259
2260         /* Take an additional reference so that a blocking AST that
2261          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2262          * to arrive after an upcall has been executed by
2263          * osc_enqueue_fini(). */
2264         ldlm_lock_addref(&handle, mode);
2265
2266         /* Let CP AST to grant the lock first. */
2267         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2268
2269         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2270                 lvb = NULL;
2271                 lvb_len = 0;
2272         } else {
2273                 lvb = aa->oa_lvb;
2274                 lvb_len = sizeof(*aa->oa_lvb);
2275         }
2276
2277         /* Complete obtaining the lock procedure. */
2278         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2279                                    mode, flags, lvb, lvb_len, &handle, rc);
2280         /* Complete osc stuff. */
2281         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2282                               flags, aa->oa_agl, rc);
2283
2284         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2285
2286         /* Release the lock for async request. */
2287         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2288                 /*
2289                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2290                  * not already released by
2291                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2292                  */
2293                 ldlm_lock_decref(&handle, mode);
2294
2295         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2296                  aa->oa_lockh, req, aa);
2297         ldlm_lock_decref(&handle, mode);
2298         LDLM_LOCK_PUT(lock);
2299         return rc;
2300 }
2301
2302 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2303                         struct lov_oinfo *loi, int flags,
2304                         struct ost_lvb *lvb, __u32 mode, int rc)
2305 {
2306         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2307
2308         if (rc == ELDLM_OK) {
2309                 __u64 tmp;
2310
2311                 LASSERT(lock != NULL);
2312                 loi->loi_lvb = *lvb;
2313                 tmp = loi->loi_lvb.lvb_size;
2314                 /* Extend KMS up to the end of this lock and no further
2315                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2316                 if (tmp > lock->l_policy_data.l_extent.end)
2317                         tmp = lock->l_policy_data.l_extent.end + 1;
2318                 if (tmp >= loi->loi_kms) {
2319                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2320                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2321                         loi_kms_set(loi, tmp);
2322                 } else {
2323                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2324                                    LPU64"; leaving kms="LPU64", end="LPU64,
2325                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2326                                    lock->l_policy_data.l_extent.end);
2327                 }
2328                 ldlm_lock_allow_match(lock);
2329         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2330                 LASSERT(lock != NULL);
2331                 loi->loi_lvb = *lvb;
2332                 ldlm_lock_allow_match(lock);
2333                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2334                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2335                 rc = ELDLM_OK;
2336         }
2337
2338         if (lock != NULL) {
2339                 if (rc != ELDLM_OK)
2340                         ldlm_lock_fail_match(lock);
2341
2342                 LDLM_LOCK_PUT(lock);
2343         }
2344 }
2345 EXPORT_SYMBOL(osc_update_enqueue);
2346
2347 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2348
2349 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2350  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2351  * other synchronous requests, however keeping some locks and trying to obtain
2352  * others may take a considerable amount of time in a case of ost failure; and
2353  * when other sync requests do not get released lock from a client, the client
2354  * is excluded from the cluster -- such scenarious make the life difficult, so
2355  * release locks just after they are obtained. */
2356 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2357                      int *flags, ldlm_policy_data_t *policy,
2358                      struct ost_lvb *lvb, int kms_valid,
2359                      obd_enqueue_update_f upcall, void *cookie,
2360                      struct ldlm_enqueue_info *einfo,
2361                      struct lustre_handle *lockh,
2362                      struct ptlrpc_request_set *rqset, int async, int agl)
2363 {
2364         struct obd_device *obd = exp->exp_obd;
2365         struct ptlrpc_request *req = NULL;
2366         int intent = *flags & LDLM_FL_HAS_INTENT;
2367         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2368         ldlm_mode_t mode;
2369         int rc;
2370         ENTRY;
2371
2372         /* Filesystem lock extents are extended to page boundaries so that
2373          * dealing with the page cache is a little smoother.  */
2374         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2375         policy->l_extent.end |= ~CFS_PAGE_MASK;
2376
2377         /*
2378          * kms is not valid when either object is completely fresh (so that no
2379          * locks are cached), or object was evicted. In the latter case cached
2380          * lock cannot be used, because it would prime inode state with
2381          * potentially stale LVB.
2382          */
2383         if (!kms_valid)
2384                 goto no_match;
2385
2386         /* Next, search for already existing extent locks that will cover us */
2387         /* If we're trying to read, we also search for an existing PW lock.  The
2388          * VFS and page cache already protect us locally, so lots of readers/
2389          * writers can share a single PW lock.
2390          *
2391          * There are problems with conversion deadlocks, so instead of
2392          * converting a read lock to a write lock, we'll just enqueue a new
2393          * one.
2394          *
2395          * At some point we should cancel the read lock instead of making them
2396          * send us a blocking callback, but there are problems with canceling
2397          * locks out from other users right now, too. */
2398         mode = einfo->ei_mode;
2399         if (einfo->ei_mode == LCK_PR)
2400                 mode |= LCK_PW;
2401         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2402                                einfo->ei_type, policy, mode, lockh, 0);
2403         if (mode) {
2404                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2405
2406                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2407                         /* For AGL, if enqueue RPC is sent but the lock is not
2408                          * granted, then skip to process this strpe.
2409                          * Return -ECANCELED to tell the caller. */
2410                         ldlm_lock_decref(lockh, mode);
2411                         LDLM_LOCK_PUT(matched);
2412                         RETURN(-ECANCELED);
2413                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2414                         *flags |= LDLM_FL_LVB_READY;
2415                         /* addref the lock only if not async requests and PW
2416                          * lock is matched whereas we asked for PR. */
2417                         if (!rqset && einfo->ei_mode != mode)
2418                                 ldlm_lock_addref(lockh, LCK_PR);
2419                         if (intent) {
2420                                 /* I would like to be able to ASSERT here that
2421                                  * rss <= kms, but I can't, for reasons which
2422                                  * are explained in lov_enqueue() */
2423                         }
2424
2425                         /* We already have a lock, and it's referenced */
2426                         (*upcall)(cookie, ELDLM_OK);
2427
2428                         if (einfo->ei_mode != mode)
2429                                 ldlm_lock_decref(lockh, LCK_PW);
2430                         else if (rqset)
2431                                 /* For async requests, decref the lock. */
2432                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2433                         LDLM_LOCK_PUT(matched);
2434                         RETURN(ELDLM_OK);
2435                 } else {
2436                         ldlm_lock_decref(lockh, mode);
2437                         LDLM_LOCK_PUT(matched);
2438                 }
2439         }
2440
2441  no_match:
2442         if (intent) {
2443                 CFS_LIST_HEAD(cancels);
2444                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2445                                            &RQF_LDLM_ENQUEUE_LVB);
2446                 if (req == NULL)
2447                         RETURN(-ENOMEM);
2448
2449                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2450                 if (rc) {
2451                         ptlrpc_request_free(req);
2452                         RETURN(rc);
2453                 }
2454
2455                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2456                                      sizeof *lvb);
2457                 ptlrpc_request_set_replen(req);
2458         }
2459
2460         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2461         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2462
2463         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2464                               sizeof(*lvb), lockh, async);
2465         if (rqset) {
2466                 if (!rc) {
2467                         struct osc_enqueue_args *aa;
2468                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2469                         aa = ptlrpc_req_async_args(req);
2470                         aa->oa_ei = einfo;
2471                         aa->oa_exp = exp;
2472                         aa->oa_flags  = flags;
2473                         aa->oa_upcall = upcall;
2474                         aa->oa_cookie = cookie;
2475                         aa->oa_lvb    = lvb;
2476                         aa->oa_lockh  = lockh;
2477                         aa->oa_agl    = !!agl;
2478
2479                         req->rq_interpret_reply =
2480                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2481                         if (rqset == PTLRPCD_SET)
2482                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2483                         else
2484                                 ptlrpc_set_add_req(rqset, req);
2485                 } else if (intent) {
2486                         ptlrpc_req_finished(req);
2487                 }
2488                 RETURN(rc);
2489         }
2490
2491         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2492         if (intent)
2493                 ptlrpc_req_finished(req);
2494
2495         RETURN(rc);
2496 }
2497
2498 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2499                        struct ldlm_enqueue_info *einfo,
2500                        struct ptlrpc_request_set *rqset)
2501 {
2502         struct ldlm_res_id res_id;
2503         int rc;
2504         ENTRY;
2505
2506         osc_build_res_name(oinfo->oi_md->lsm_object_id,
2507                            oinfo->oi_md->lsm_object_seq, &res_id);
2508
2509         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2510                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2511                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2512                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2513                               rqset, rqset != NULL, 0);
2514         RETURN(rc);
2515 }
2516
2517 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2518                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2519                    int *flags, void *data, struct lustre_handle *lockh,
2520                    int unref)
2521 {
2522         struct obd_device *obd = exp->exp_obd;
2523         int lflags = *flags;
2524         ldlm_mode_t rc;
2525         ENTRY;
2526
2527         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2528                 RETURN(-EIO);
2529
2530         /* Filesystem lock extents are extended to page boundaries so that
2531          * dealing with the page cache is a little smoother */
2532         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2533         policy->l_extent.end |= ~CFS_PAGE_MASK;
2534
2535         /* Next, search for already existing extent locks that will cover us */
2536         /* If we're trying to read, we also search for an existing PW lock.  The
2537          * VFS and page cache already protect us locally, so lots of readers/
2538          * writers can share a single PW lock. */
2539         rc = mode;
2540         if (mode == LCK_PR)
2541                 rc |= LCK_PW;
2542         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2543                              res_id, type, policy, rc, lockh, unref);
2544         if (rc) {
2545                 if (data != NULL) {
2546                         if (!osc_set_data_with_check(lockh, data)) {
2547                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2548                                         ldlm_lock_decref(lockh, rc);
2549                                 RETURN(0);
2550                         }
2551                 }
2552                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2553                         ldlm_lock_addref(lockh, LCK_PR);
2554                         ldlm_lock_decref(lockh, LCK_PW);
2555                 }
2556                 RETURN(rc);
2557         }
2558         RETURN(rc);
2559 }
2560
2561 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2562 {
2563         ENTRY;
2564
2565         if (unlikely(mode == LCK_GROUP))
2566                 ldlm_lock_decref_and_cancel(lockh, mode);
2567         else
2568                 ldlm_lock_decref(lockh, mode);
2569
2570         RETURN(0);
2571 }
2572
2573 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2574                       __u32 mode, struct lustre_handle *lockh)
2575 {
2576         ENTRY;
2577         RETURN(osc_cancel_base(lockh, mode));
2578 }
2579
2580 static int osc_cancel_unused(struct obd_export *exp,
2581                              struct lov_stripe_md *lsm,
2582                              ldlm_cancel_flags_t flags,
2583                              void *opaque)
2584 {
2585         struct obd_device *obd = class_exp2obd(exp);
2586         struct ldlm_res_id res_id, *resp = NULL;
2587
2588         if (lsm != NULL) {
2589                 resp = osc_build_res_name(lsm->lsm_object_id,
2590                                           lsm->lsm_object_seq, &res_id);
2591         }
2592
2593         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2594 }
2595
2596 static int osc_statfs_interpret(const struct lu_env *env,
2597                                 struct ptlrpc_request *req,
2598                                 struct osc_async_args *aa, int rc)
2599 {
2600         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
2601         struct obd_statfs *msfs;
2602         __u64 used;
2603         ENTRY;
2604
2605         if (rc == -EBADR)
2606                 /* The request has in fact never been sent
2607                  * due to issues at a higher level (LOV).
2608                  * Exit immediately since the caller is
2609                  * aware of the problem and takes care
2610                  * of the clean up */
2611                  RETURN(rc);
2612
2613         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2614             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2615                 GOTO(out, rc = 0);
2616
2617         if (rc != 0)
2618                 GOTO(out, rc);
2619
2620         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2621         if (msfs == NULL) {
2622                 GOTO(out, rc = -EPROTO);
2623         }
2624
2625         /* Reinitialize the RDONLY and DEGRADED flags at the client
2626          * on each statfs, so they don't stay set permanently. */
2627         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
2628
2629         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
2630                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
2631         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
2632                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
2633
2634         if (unlikely(msfs->os_state & OS_STATE_READONLY))
2635                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
2636         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
2637                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
2638
2639         /* Add a bit of hysteresis so this flag isn't continually flapping,
2640          * and ensure that new files don't get extremely fragmented due to
2641          * only a small amount of available space in the filesystem.
2642          * We want to set the NOSPC flag when there is less than ~0.1% free
2643          * and clear it when there is at least ~0.2% free space, so:
2644          *                   avail < ~0.1% max          max = avail + used
2645          *            1025 * avail < avail + used       used = blocks - free
2646          *            1024 * avail < used
2647          *            1024 * avail < blocks - free
2648          *                   avail < ((blocks - free) >> 10)
2649          *
2650          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
2651          * lose that amount of space so in those cases we report no space left
2652          * if their is less than 1 GB left.                             */
2653         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
2654         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
2655                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
2656                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
2657         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2658                           (msfs->os_ffree > 64) &&
2659                           (msfs->os_bavail > (used << 1)))) {
2660                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
2661                                              OSCC_FLAG_NOSPC_BLK);
2662         }
2663
2664         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
2665                      (msfs->os_bavail < used)))
2666                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
2667
2668         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
2669
2670         *aa->aa_oi->oi_osfs = *msfs;
2671 out:
2672         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2673         RETURN(rc);
2674 }
2675
2676 static int osc_statfs_async(struct obd_export *exp,
2677                             struct obd_info *oinfo, __u64 max_age,
2678                             struct ptlrpc_request_set *rqset)
2679 {
2680         struct obd_device     *obd = class_exp2obd(exp);
2681         struct ptlrpc_request *req;
2682         struct osc_async_args *aa;
2683         int                    rc;
2684         ENTRY;
2685
2686         /* We could possibly pass max_age in the request (as an absolute
2687          * timestamp or a "seconds.usec ago") so the target can avoid doing
2688          * extra calls into the filesystem if that isn't necessary (e.g.
2689          * during mount that would help a bit).  Having relative timestamps
2690          * is not so great if request processing is slow, while absolute
2691          * timestamps are not ideal because they need time synchronization. */
2692         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2693         if (req == NULL)
2694                 RETURN(-ENOMEM);
2695
2696         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2697         if (rc) {
2698                 ptlrpc_request_free(req);
2699                 RETURN(rc);
2700         }
2701         ptlrpc_request_set_replen(req);
2702         req->rq_request_portal = OST_CREATE_PORTAL;
2703         ptlrpc_at_set_req_timeout(req);
2704
2705         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2706                 /* procfs requests not want stat in wait for avoid deadlock */
2707                 req->rq_no_resend = 1;
2708                 req->rq_no_delay = 1;
2709         }
2710
2711         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2712         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2713         aa = ptlrpc_req_async_args(req);
2714         aa->aa_oi = oinfo;
2715
2716         ptlrpc_set_add_req(rqset, req);
2717         RETURN(0);
2718 }
2719
2720 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2721                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2722 {
2723         struct obd_device     *obd = class_exp2obd(exp);
2724         struct obd_statfs     *msfs;
2725         struct ptlrpc_request *req;
2726         struct obd_import     *imp = NULL;
2727         int rc;
2728         ENTRY;
2729
2730         /*Since the request might also come from lprocfs, so we need
2731          *sync this with client_disconnect_export Bug15684*/
2732         cfs_down_read(&obd->u.cli.cl_sem);
2733         if (obd->u.cli.cl_import)
2734                 imp = class_import_get(obd->u.cli.cl_import);
2735         cfs_up_read(&obd->u.cli.cl_sem);
2736         if (!imp)
2737                 RETURN(-ENODEV);
2738
2739         /* We could possibly pass max_age in the request (as an absolute
2740          * timestamp or a "seconds.usec ago") so the target can avoid doing
2741          * extra calls into the filesystem if that isn't necessary (e.g.
2742          * during mount that would help a bit).  Having relative timestamps
2743          * is not so great if request processing is slow, while absolute
2744          * timestamps are not ideal because they need time synchronization. */
2745         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2746
2747         class_import_put(imp);
2748
2749         if (req == NULL)
2750                 RETURN(-ENOMEM);
2751
2752         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2753         if (rc) {
2754                 ptlrpc_request_free(req);
2755                 RETURN(rc);
2756         }
2757         ptlrpc_request_set_replen(req);
2758         req->rq_request_portal = OST_CREATE_PORTAL;
2759         ptlrpc_at_set_req_timeout(req);
2760
2761         if (flags & OBD_STATFS_NODELAY) {
2762                 /* procfs requests not want stat in wait for avoid deadlock */
2763                 req->rq_no_resend = 1;
2764                 req->rq_no_delay = 1;
2765         }
2766
2767         rc = ptlrpc_queue_wait(req);
2768         if (rc)
2769                 GOTO(out, rc);
2770
2771         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2772         if (msfs == NULL) {
2773                 GOTO(out, rc = -EPROTO);
2774         }
2775
2776         *osfs = *msfs;
2777
2778         EXIT;
2779  out:
2780         ptlrpc_req_finished(req);
2781         return rc;
2782 }
2783
2784 /* Retrieve object striping information.
2785  *
2786  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2787  * the maximum number of OST indices which will fit in the user buffer.
2788  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2789  */
2790 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2791 {
2792         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2793         struct lov_user_md_v3 lum, *lumk;
2794         struct lov_user_ost_data_v1 *lmm_objects;
2795         int rc = 0, lum_size;
2796         ENTRY;
2797
2798         if (!lsm)
2799                 RETURN(-ENODATA);
2800
2801         /* we only need the header part from user space to get lmm_magic and
2802          * lmm_stripe_count, (the header part is common to v1 and v3) */
2803         lum_size = sizeof(struct lov_user_md_v1);
2804         if (cfs_copy_from_user(&lum, lump, lum_size))
2805                 RETURN(-EFAULT);
2806
2807         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2808             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2809                 RETURN(-EINVAL);
2810
2811         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2812         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2813         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2814         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2815
2816         /* we can use lov_mds_md_size() to compute lum_size
2817          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2818         if (lum.lmm_stripe_count > 0) {
2819                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2820                 OBD_ALLOC(lumk, lum_size);
2821                 if (!lumk)
2822                         RETURN(-ENOMEM);
2823
2824                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2825                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2826                 else
2827                         lmm_objects = &(lumk->lmm_objects[0]);
2828                 lmm_objects->l_object_id = lsm->lsm_object_id;
2829         } else {
2830                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2831                 lumk = &lum;
2832         }
2833
2834         lumk->lmm_object_id = lsm->lsm_object_id;
2835         lumk->lmm_object_seq = lsm->lsm_object_seq;
2836         lumk->lmm_stripe_count = 1;
2837
2838         if (cfs_copy_to_user(lump, lumk, lum_size))
2839                 rc = -EFAULT;
2840
2841         if (lumk != &lum)
2842                 OBD_FREE(lumk, lum_size);
2843
2844         RETURN(rc);
2845 }
2846
2847
2848 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2849                          void *karg, void *uarg)
2850 {
2851         struct obd_device *obd = exp->exp_obd;
2852         struct obd_ioctl_data *data = karg;
2853         int err = 0;
2854         ENTRY;
2855
2856         if (!cfs_try_module_get(THIS_MODULE)) {
2857                 CERROR("Can't get module. Is it alive?");
2858                 return -EINVAL;
2859         }
2860         switch (cmd) {
2861         case OBD_IOC_LOV_GET_CONFIG: {
2862                 char *buf;
2863                 struct lov_desc *desc;
2864                 struct obd_uuid uuid;
2865
2866                 buf = NULL;
2867                 len = 0;
2868                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2869                         GOTO(out, err = -EINVAL);
2870
2871                 data = (struct obd_ioctl_data *)buf;
2872
2873                 if (sizeof(*desc) > data->ioc_inllen1) {
2874                         obd_ioctl_freedata(buf, len);
2875                         GOTO(out, err = -EINVAL);
2876                 }
2877
2878                 if (data->ioc_inllen2 < sizeof(uuid)) {
2879                         obd_ioctl_freedata(buf, len);
2880                         GOTO(out, err = -EINVAL);
2881                 }
2882
2883                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2884                 desc->ld_tgt_count = 1;
2885                 desc->ld_active_tgt_count = 1;
2886                 desc->ld_default_stripe_count = 1;
2887                 desc->ld_default_stripe_size = 0;
2888                 desc->ld_default_stripe_offset = 0;
2889                 desc->ld_pattern = 0;
2890                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2891
2892                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2893
2894                 err = cfs_copy_to_user((void *)uarg, buf, len);
2895                 if (err)
2896                         err = -EFAULT;
2897                 obd_ioctl_freedata(buf, len);
2898                 GOTO(out, err);
2899         }
2900         case LL_IOC_LOV_SETSTRIPE:
2901                 err = obd_alloc_memmd(exp, karg);
2902                 if (err > 0)
2903                         err = 0;
2904                 GOTO(out, err);
2905         case LL_IOC_LOV_GETSTRIPE:
2906                 err = osc_getstripe(karg, uarg);
2907                 GOTO(out, err);
2908         case OBD_IOC_CLIENT_RECOVER:
2909                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2910                                             data->ioc_inlbuf1, 0);
2911                 if (err > 0)
2912                         err = 0;
2913                 GOTO(out, err);
2914         case IOC_OSC_SET_ACTIVE:
2915                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2916                                                data->ioc_offset);
2917                 GOTO(out, err);
2918         case OBD_IOC_POLL_QUOTACHECK:
2919                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2920                 GOTO(out, err);
2921         case OBD_IOC_PING_TARGET:
2922                 err = ptlrpc_obd_ping(obd);
2923                 GOTO(out, err);
2924         default:
2925                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2926                        cmd, cfs_curproc_comm());
2927                 GOTO(out, err = -ENOTTY);
2928         }
2929 out:
2930         cfs_module_put(THIS_MODULE);
2931         return err;
2932 }
2933
2934 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2935                         obd_count keylen, void *key, __u32 *vallen, void *val,
2936                         struct lov_stripe_md *lsm)
2937 {
2938         ENTRY;
2939         if (!vallen || !val)
2940                 RETURN(-EFAULT);
2941
2942         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2943                 __u32 *stripe = val;
2944                 *vallen = sizeof(*stripe);
2945                 *stripe = 0;
2946                 RETURN(0);
2947         } else if (KEY_IS(KEY_LAST_ID)) {
2948                 struct ptlrpc_request *req;
2949                 obd_id                *reply;
2950                 char                  *tmp;
2951                 int                    rc;
2952
2953                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2954                                            &RQF_OST_GET_INFO_LAST_ID);
2955                 if (req == NULL)
2956                         RETURN(-ENOMEM);
2957
2958                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2959                                      RCL_CLIENT, keylen);
2960                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2961                 if (rc) {
2962                         ptlrpc_request_free(req);
2963                         RETURN(rc);
2964                 }
2965
2966                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2967                 memcpy(tmp, key, keylen);
2968
2969                 req->rq_no_delay = req->rq_no_resend = 1;
2970                 ptlrpc_request_set_replen(req);
2971                 rc = ptlrpc_queue_wait(req);
2972                 if (rc)
2973                         GOTO(out, rc);
2974
2975                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2976                 if (reply == NULL)
2977                         GOTO(out, rc = -EPROTO);
2978
2979                 *((obd_id *)val) = *reply;
2980         out:
2981                 ptlrpc_req_finished(req);
2982                 RETURN(rc);
2983         } else if (KEY_IS(KEY_FIEMAP)) {
2984                 struct ptlrpc_request *req;
2985                 struct ll_user_fiemap *reply;
2986                 char *tmp;
2987                 int rc;
2988
2989                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2990                                            &RQF_OST_GET_INFO_FIEMAP);
2991                 if (req == NULL)
2992                         RETURN(-ENOMEM);
2993
2994                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2995                                      RCL_CLIENT, keylen);
2996                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2997                                      RCL_CLIENT, *vallen);
2998                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2999                                      RCL_SERVER, *vallen);
3000
3001                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3002                 if (rc) {
3003                         ptlrpc_request_free(req);
3004                         RETURN(rc);
3005                 }
3006
3007                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3008                 memcpy(tmp, key, keylen);
3009                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3010                 memcpy(tmp, val, *vallen);
3011
3012                 ptlrpc_request_set_replen(req);
3013                 rc = ptlrpc_queue_wait(req);
3014                 if (rc)
3015                         GOTO(out1, rc);
3016
3017                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3018                 if (reply == NULL)
3019                         GOTO(out1, rc = -EPROTO);
3020
3021                 memcpy(val, reply, *vallen);
3022         out1:
3023                 ptlrpc_req_finished(req);
3024
3025                 RETURN(rc);
3026         }
3027
3028         RETURN(-EINVAL);
3029 }
3030
3031 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3032 {
3033         struct llog_ctxt *ctxt;
3034         int rc = 0;
3035         ENTRY;
3036
3037         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3038         if (ctxt) {
3039                 rc = llog_initiator_connect(ctxt);
3040                 llog_ctxt_put(ctxt);
3041         } else {
3042                 /* XXX return an error? skip setting below flags? */
3043         }
3044
3045         cfs_spin_lock(&imp->imp_lock);
3046         imp->imp_server_timeout = 1;
3047         imp->imp_pingable = 1;
3048         cfs_spin_unlock(&imp->imp_lock);
3049         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3050
3051         RETURN(rc);
3052 }
3053
3054 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3055                                           struct ptlrpc_request *req,
3056                                           void *aa, int rc)
3057 {
3058         ENTRY;
3059         if (rc != 0)
3060                 RETURN(rc);
3061
3062         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3063 }
3064
3065 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3066                               obd_count keylen, void *key, obd_count vallen,
3067                               void *val, struct ptlrpc_request_set *set)
3068 {
3069         struct ptlrpc_request *req;
3070         struct obd_device     *obd = exp->exp_obd;
3071         struct obd_import     *imp = class_exp2cliimp(exp);
3072         char                  *tmp;
3073         int                    rc;
3074         ENTRY;
3075
3076         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3077
3078         if (KEY_IS(KEY_NEXT_ID)) {
3079                 obd_id new_val;
3080                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3081
3082                 if (vallen != sizeof(obd_id))
3083                         RETURN(-ERANGE);
3084                 if (val == NULL)
3085                         RETURN(-EINVAL);
3086
3087                 if (vallen != sizeof(obd_id))
3088                         RETURN(-EINVAL);
3089
3090                 /* avoid race between allocate new object and set next id
3091                  * from ll_sync thread */
3092                 cfs_spin_lock(&oscc->oscc_lock);
3093                 new_val = *((obd_id*)val) + 1;
3094                 if (new_val > oscc->oscc_next_id)
3095                         oscc->oscc_next_id = new_val;
3096                 cfs_spin_unlock(&oscc->oscc_lock);
3097                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3098                        exp->exp_obd->obd_name,
3099                        obd->u.cli.cl_oscc.oscc_next_id);
3100
3101                 RETURN(0);
3102         }
3103
3104         if (KEY_IS(KEY_CHECKSUM)) {
3105                 if (vallen != sizeof(int))
3106                         RETURN(-EINVAL);
3107                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3108                 RETURN(0);
3109         }
3110
3111         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3112                 sptlrpc_conf_client_adapt(obd);
3113                 RETURN(0);
3114         }
3115
3116         if (KEY_IS(KEY_FLUSH_CTX)) {
3117                 sptlrpc_import_flush_my_ctx(imp);
3118                 RETURN(0);
3119         }
3120
3121         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3122                 RETURN(-EINVAL);
3123
3124         /* We pass all other commands directly to OST. Since nobody calls osc
3125            methods directly and everybody is supposed to go through LOV, we
3126            assume lov checked invalid values for us.
3127            The only recognised values so far are evict_by_nid and mds_conn.
3128            Even if something bad goes through, we'd get a -EINVAL from OST
3129            anyway. */
3130
3131         if (KEY_IS(KEY_GRANT_SHRINK))
3132                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3133         else
3134                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3135
3136         if (req == NULL)
3137                 RETURN(-ENOMEM);
3138
3139         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3140                              RCL_CLIENT, keylen);
3141         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3142                              RCL_CLIENT, vallen);
3143         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3144         if (rc) {
3145                 ptlrpc_request_free(req);
3146                 RETURN(rc);
3147         }
3148
3149         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3150         memcpy(tmp, key, keylen);
3151         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3152         memcpy(tmp, val, vallen);
3153
3154         if (KEY_IS(KEY_MDS_CONN)) {
3155                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3156
3157                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
3158                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3159                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
3160                 req->rq_no_delay = req->rq_no_resend = 1;
3161                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3162         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3163                 struct osc_grant_args *aa;
3164                 struct obdo *oa;
3165
3166                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3167                 aa = ptlrpc_req_async_args(req);
3168                 OBDO_ALLOC(oa);
3169                 if (!oa) {
3170                         ptlrpc_req_finished(req);
3171                         RETURN(-ENOMEM);
3172                 }
3173                 *oa = ((struct ost_body *)val)->oa;
3174                 aa->aa_oa = oa;
3175                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3176         }
3177
3178         ptlrpc_request_set_replen(req);
3179         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3180                 LASSERT(set != NULL);
3181                 ptlrpc_set_add_req(set, req);
3182                 ptlrpc_check_set(NULL, set);
3183         } else
3184                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3185
3186         RETURN(0);
3187 }
3188
3189
3190 static struct llog_operations osc_size_repl_logops = {
3191         lop_cancel: llog_obd_repl_cancel
3192 };
3193
3194 static struct llog_operations osc_mds_ost_orig_logops;
3195
3196 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3197                            struct obd_device *tgt, struct llog_catid *catid)
3198 {
3199         int rc;
3200         ENTRY;
3201
3202         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3203                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3204         if (rc) {
3205                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3206                 GOTO(out, rc);
3207         }
3208
3209         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3210                         NULL, &osc_size_repl_logops);
3211         if (rc) {
3212                 struct llog_ctxt *ctxt =
3213                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3214                 if (ctxt)
3215                         llog_cleanup(ctxt);
3216                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3217         }
3218         GOTO(out, rc);
3219 out:
3220         if (rc) {
3221                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
3222                        obd->obd_name, tgt->obd_name, catid, rc);
3223                 CERROR("logid "LPX64":0x%x\n",
3224                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3225         }
3226         return rc;
3227 }
3228
3229 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3230                          struct obd_device *disk_obd, int *index)
3231 {
3232         struct llog_catid catid;
3233         static char name[32] = CATLIST;
3234         int rc;
3235         ENTRY;
3236
3237         LASSERT(olg == &obd->obd_olg);
3238
3239         cfs_mutex_lock(&olg->olg_cat_processing);
3240         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
3241         if (rc) {
3242                 CERROR("rc: %d\n", rc);
3243                 GOTO(out, rc);
3244         }
3245
3246         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
3247                obd->obd_name, *index, catid.lci_logid.lgl_oid,
3248                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
3249
3250         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
3251         if (rc) {
3252                 CERROR("rc: %d\n", rc);
3253                 GOTO(out, rc);
3254         }
3255
3256         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
3257         if (rc) {
3258                 CERROR("rc: %d\n", rc);
3259                 GOTO(out, rc);
3260         }
3261
3262  out:
3263         cfs_mutex_unlock(&olg->olg_cat_processing);
3264
3265         return rc;
3266 }
3267
3268 static int osc_llog_finish(struct obd_device *obd, int count)
3269 {
3270         struct llog_ctxt *ctxt;
3271         int rc = 0, rc2 = 0;
3272         ENTRY;
3273
3274         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3275         if (ctxt)
3276                 rc = llog_cleanup(ctxt);
3277
3278         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3279         if (ctxt)
3280                 rc2 = llog_cleanup(ctxt);
3281         if (!rc)
3282                 rc = rc2;
3283
3284         RETURN(rc);
3285 }
3286
3287 static int osc_reconnect(const struct lu_env *env,
3288                          struct obd_export *exp, struct obd_device *obd,
3289                          struct obd_uuid *cluuid,
3290                          struct obd_connect_data *data,
3291                          void *localdata)
3292 {
3293         struct client_obd *cli = &obd->u.cli;
3294
3295         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3296                 long lost_grant;
3297
3298                 client_obd_list_lock(&cli->cl_loi_list_lock);
3299                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3300                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3301                 lost_grant = cli->cl_lost_grant;
3302                 cli->cl_lost_grant = 0;
3303                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3304
3305                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3306                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
3307                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
3308                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3309                        " ocd_grant: %d\n", data->ocd_connect_flags,
3310                        data->ocd_version, data->ocd_grant);
3311         }
3312
3313         RETURN(0);
3314 }
3315
3316 static int osc_disconnect(struct obd_export *exp)
3317 {
3318         struct obd_device *obd = class_exp2obd(exp);
3319         struct llog_ctxt  *ctxt;
3320         int rc;
3321
3322         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3323         if (ctxt) {
3324                 if (obd->u.cli.cl_conn_count == 1) {
3325                         /* Flush any remaining cancel messages out to the
3326                          * target */
3327                         llog_sync(ctxt, exp);
3328                 }
3329                 llog_ctxt_put(ctxt);
3330         } else {
3331                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3332                        obd);
3333         }
3334
3335         rc = client_disconnect_export(exp);
3336         /**
3337          * Initially we put del_shrink_grant before disconnect_export, but it
3338          * causes the following problem if setup (connect) and cleanup
3339          * (disconnect) are tangled together.
3340          *      connect p1                     disconnect p2
3341          *   ptlrpc_connect_import
3342          *     ...............               class_manual_cleanup
3343          *                                     osc_disconnect
3344          *                                     del_shrink_grant
3345          *   ptlrpc_connect_interrupt
3346          *     init_grant_shrink
3347          *   add this client to shrink list
3348          *                                      cleanup_osc
3349          * Bang! pinger trigger the shrink.
3350          * So the osc should be disconnected from the shrink list, after we
3351          * are sure the import has been destroyed. BUG18662
3352          */
3353         if (obd->u.cli.cl_import == NULL)
3354                 osc_del_shrink_grant(&obd->u.cli);
3355         return rc;
3356 }
3357
3358 static int osc_import_event(struct obd_device *obd,
3359                             struct obd_import *imp,
3360                             enum obd_import_event event)
3361 {
3362         struct client_obd *cli;
3363         int rc = 0;
3364
3365         ENTRY;
3366         LASSERT(imp->imp_obd == obd);
3367
3368         switch (event) {
3369         case IMP_EVENT_DISCON: {
3370                 /* Only do this on the MDS OSC's */
3371                 if (imp->imp_server_timeout) {
3372                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3373
3374                         cfs_spin_lock(&oscc->oscc_lock);
3375                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3376                         cfs_spin_unlock(&oscc->oscc_lock);
3377                 }
3378                 cli = &obd->u.cli;
3379                 client_obd_list_lock(&cli->cl_loi_list_lock);
3380                 cli->cl_avail_grant = 0;
3381                 cli->cl_lost_grant = 0;
3382                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3383                 break;
3384         }
3385         case IMP_EVENT_INACTIVE: {
3386                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3387                 break;
3388         }
3389         case IMP_EVENT_INVALIDATE: {
3390                 struct ldlm_namespace *ns = obd->obd_namespace;
3391                 struct lu_env         *env;
3392                 int                    refcheck;
3393
3394                 env = cl_env_get(&refcheck);
3395                 if (!IS_ERR(env)) {
3396                         /* Reset grants */
3397                         cli = &obd->u.cli;
3398                         client_obd_list_lock(&cli->cl_loi_list_lock);
3399                         /* all pages go to failing rpcs due to the invalid
3400                          * import */
3401                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3402                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3403
3404                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3405                         cl_env_put(env, &refcheck);
3406                 } else
3407                         rc = PTR_ERR(env);
3408                 break;
3409         }
3410         case IMP_EVENT_ACTIVE: {
3411                 /* Only do this on the MDS OSC's */
3412                 if (imp->imp_server_timeout) {
3413                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3414
3415                         cfs_spin_lock(&oscc->oscc_lock);
3416                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
3417                                               OSCC_FLAG_NOSPC_BLK);
3418                         cfs_spin_unlock(&oscc->oscc_lock);
3419                 }
3420                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3421                 break;
3422         }
3423         case IMP_EVENT_OCD: {
3424                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3425
3426                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3427                         osc_init_grant(&obd->u.cli, ocd);
3428
3429                 /* See bug 7198 */
3430                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3431                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3432
3433                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3434                 break;
3435         }
3436         case IMP_EVENT_DEACTIVATE: {
3437                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3438                 break;
3439         }
3440         case IMP_EVENT_ACTIVATE: {
3441                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3442                 break;
3443         }
3444         default:
3445                 CERROR("Unknown import event %d\n", event);
3446                 LBUG();
3447         }
3448         RETURN(rc);
3449 }
3450
3451 /**
3452  * Determine whether the lock can be canceled before replaying the lock
3453  * during recovery, see bug16774 for detailed information.
3454  *
3455  * \retval zero the lock can't be canceled
3456  * \retval other ok to cancel
3457  */
3458 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3459 {
3460         check_res_locked(lock->l_resource);
3461
3462         /*
3463          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3464          *
3465          * XXX as a future improvement, we can also cancel unused write lock
3466          * if it doesn't have dirty data and active mmaps.
3467          */
3468         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3469             (lock->l_granted_mode == LCK_PR ||
3470              lock->l_granted_mode == LCK_CR) &&
3471             (osc_dlm_lock_pageref(lock) == 0))
3472                 RETURN(1);
3473
3474         RETURN(0);
3475 }
3476
3477 static int brw_queue_work(const struct lu_env *env, void *data)
3478 {
3479         struct client_obd *cli = data;
3480
3481         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3482
3483         client_obd_list_lock(&cli->cl_loi_list_lock);
3484         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3485         client_obd_list_unlock(&cli->cl_loi_list_lock);
3486         RETURN(0);
3487 }
3488
3489 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3490 {
3491         struct client_obd *cli = &obd->u.cli;
3492         int rc;
3493         ENTRY;
3494
3495         ENTRY;
3496         rc = ptlrpcd_addref();
3497         if (rc)
3498                 RETURN(rc);
3499
3500         rc = client_obd_setup(obd, lcfg);
3501         if (rc == 0) {
3502                 void *handler;
3503                 handler = ptlrpcd_alloc_work(cli->cl_import,
3504                                              brw_queue_work, cli);
3505                 if (!IS_ERR(handler))
3506                         cli->cl_writeback_work = handler;
3507                 else
3508                         rc = PTR_ERR(handler);
3509         }
3510
3511         if (rc == 0) {
3512                 struct lprocfs_static_vars lvars = { 0 };
3513
3514                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3515                 lprocfs_osc_init_vars(&lvars);
3516                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3517                         lproc_osc_attach_seqstat(obd);
3518                         sptlrpc_lprocfs_cliobd_attach(obd);
3519                         ptlrpc_lprocfs_register_obd(obd);
3520                 }
3521
3522                 oscc_init(obd);
3523                 /* We need to allocate a few requests more, because
3524                    brw_interpret tries to create new requests before freeing
3525                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3526                    reserved, but I afraid that might be too much wasted RAM
3527                    in fact, so 2 is just my guess and still should work. */
3528                 cli->cl_import->imp_rq_pool =
3529                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3530                                             OST_MAXREQSIZE,
3531                                             ptlrpc_add_rqs_to_pool);
3532
3533                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3534
3535                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3536         }
3537
3538         if (rc)
3539                 ptlrpcd_decref();
3540         RETURN(rc);
3541 }
3542
3543 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3544 {
3545         int rc = 0;
3546         ENTRY;
3547
3548         switch (stage) {
3549         case OBD_CLEANUP_EARLY: {
3550                 struct obd_import *imp;
3551                 imp = obd->u.cli.cl_import;
3552                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3553                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3554                 ptlrpc_deactivate_import(imp);
3555                 cfs_spin_lock(&imp->imp_lock);
3556                 imp->imp_pingable = 0;
3557                 cfs_spin_unlock(&imp->imp_lock);
3558                 break;
3559         }
3560         case OBD_CLEANUP_EXPORTS: {
3561                 struct client_obd *cli = &obd->u.cli;
3562                 /* LU-464
3563                  * for echo client, export may be on zombie list, wait for
3564                  * zombie thread to cull it, because cli.cl_import will be
3565                  * cleared in client_disconnect_export():
3566                  *   class_export_destroy() -> obd_cleanup() ->
3567                  *   echo_device_free() -> echo_client_cleanup() ->
3568                  *   obd_disconnect() -> osc_disconnect() ->
3569                  *   client_disconnect_export()
3570                  */
3571                 obd_zombie_barrier();
3572                 if (cli->cl_writeback_work) {
3573                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3574                         cli->cl_writeback_work = NULL;
3575                 }
3576                 obd_cleanup_client_import(obd);
3577                 ptlrpc_lprocfs_unregister_obd(obd);
3578                 lprocfs_obd_cleanup(obd);
3579                 rc = obd_llog_finish(obd, 0);
3580                 if (rc != 0)
3581                         CERROR("failed to cleanup llogging subsystems\n");
3582                 break;
3583                 }
3584         }
3585         RETURN(rc);
3586 }
3587
3588 int osc_cleanup(struct obd_device *obd)
3589 {
3590         int rc;
3591
3592         ENTRY;
3593
3594         /* free memory of osc quota cache */
3595         osc_quota_cleanup(obd);
3596
3597         rc = client_obd_cleanup(obd);
3598
3599         ptlrpcd_decref();
3600         RETURN(rc);
3601 }
3602
3603 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3604 {
3605         struct lprocfs_static_vars lvars = { 0 };
3606         int rc = 0;
3607
3608         lprocfs_osc_init_vars(&lvars);
3609
3610         switch (lcfg->lcfg_command) {
3611         default:
3612                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3613                                               lcfg, obd);
3614                 if (rc > 0)
3615                         rc = 0;
3616                 break;
3617         }
3618
3619         return(rc);
3620 }
3621
3622 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3623 {
3624         return osc_process_config_base(obd, buf);
3625 }
3626
3627 struct obd_ops osc_obd_ops = {
3628         .o_owner                = THIS_MODULE,
3629         .o_setup                = osc_setup,
3630         .o_precleanup           = osc_precleanup,
3631         .o_cleanup              = osc_cleanup,
3632         .o_add_conn             = client_import_add_conn,
3633         .o_del_conn             = client_import_del_conn,
3634         .o_connect              = client_connect_import,
3635         .o_reconnect            = osc_reconnect,
3636         .o_disconnect           = osc_disconnect,
3637         .o_statfs               = osc_statfs,
3638         .o_statfs_async         = osc_statfs_async,
3639         .o_packmd               = osc_packmd,
3640         .o_unpackmd             = osc_unpackmd,
3641         .o_precreate            = osc_precreate,
3642         .o_create               = osc_create,
3643         .o_create_async         = osc_create_async,
3644         .o_destroy              = osc_destroy,
3645         .o_getattr              = osc_getattr,
3646         .o_getattr_async        = osc_getattr_async,
3647         .o_setattr              = osc_setattr,
3648         .o_setattr_async        = osc_setattr_async,
3649         .o_brw                  = osc_brw,
3650         .o_punch                = osc_punch,
3651         .o_sync                 = osc_sync,
3652         .o_enqueue              = osc_enqueue,
3653         .o_change_cbdata        = osc_change_cbdata,
3654         .o_find_cbdata          = osc_find_cbdata,
3655         .o_cancel               = osc_cancel,
3656         .o_cancel_unused        = osc_cancel_unused,
3657         .o_iocontrol            = osc_iocontrol,
3658         .o_get_info             = osc_get_info,
3659         .o_set_info_async       = osc_set_info_async,
3660         .o_import_event         = osc_import_event,
3661         .o_llog_init            = osc_llog_init,
3662         .o_llog_finish          = osc_llog_finish,
3663         .o_process_config       = osc_process_config,
3664         .o_quotactl             = osc_quotactl,
3665         .o_quotacheck           = osc_quotacheck,
3666         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
3667 };
3668
3669 extern struct lu_kmem_descr osc_caches[];
3670 extern cfs_spinlock_t       osc_ast_guard;
3671 extern cfs_lock_class_key_t osc_ast_guard_class;
3672
3673 int __init osc_init(void)
3674 {
3675         struct lprocfs_static_vars lvars = { 0 };
3676         int rc;
3677         ENTRY;
3678
3679         /* print an address of _any_ initialized kernel symbol from this
3680          * module, to allow debugging with gdb that doesn't support data
3681          * symbols from modules.*/
3682         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3683
3684         rc = lu_kmem_init(osc_caches);
3685
3686         lprocfs_osc_init_vars(&lvars);
3687
3688         osc_quota_init();
3689         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3690                                  LUSTRE_OSC_NAME, &osc_device_type);
3691         if (rc) {
3692                 lu_kmem_fini(osc_caches);
3693                 RETURN(rc);
3694         }
3695
3696         cfs_spin_lock_init(&osc_ast_guard);
3697         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3698
3699         osc_mds_ost_orig_logops = llog_lvfs_ops;
3700         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3701         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3702         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3703         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3704
3705         RETURN(rc);
3706 }
3707
3708 #ifdef __KERNEL__
3709 static void /*__exit*/ osc_exit(void)
3710 {
3711         osc_quota_exit();
3712         class_unregister_type(LUSTRE_OSC_NAME);
3713         lu_kmem_fini(osc_caches);
3714 }
3715
3716 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3717 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3718 MODULE_LICENSE("GPL");
3719
3720 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3721 #endif