반응형

오픈소스 비행제어소프트웨어인 PX4 혹은 Ardupilot은 비행체 내부 혹은 외부 장치간의 통신 프로토콜로 Mavlink 프로토콜을 사용한다.

주고받은 로그 또한 Mavlink 프로토콜로 저장되어 있다. 이때 Python API인 pymavlink에서 mavlogdump.py 라는 스크립트를 통해서 바이너리 형태인 비행체 로그 데이터를 사용자가 특정 프로토콜의 메시지만 추출할 수 있다.

문제점은 해당 코드가 argparse를 사용하기 때문에 CLI 환경에서, 명령어를 통해서만 로그 데이터를 추출할 수 있고 함수 형태로 호출하기는 어렵다는 점이 있다.

이를 위해서 메시지 타입 별로 csv 확장자로 저장하는 함수로 바꾸었다.

이런 식으로 만들게 되면 다른 코드에서 함수 형태로 호출할 수 있어서 os.system() 으로 bash 쉘로 호출하지 않아도 된다.

 

코드 정리

작업 내용은 스크립트를 단일 함수로 변경했으며, argparse으로 받을 인자 중에 필요한 것만 변수 args_XXXX 으로 모아서 정리했다.

함수의 리턴 값은 없으며, 로그 파일의 csv 파일 출력은 입력 파일이 있는 폴더에 output_csv 폴더를 만들어서 넣는다.

입력 인자

  • input_file_path : 비행체 로그 파일 (*.bin) 의 절대경로 위치
  • type_name : Mavlink 메시지 타입

출력 위치

  • [INPUT_FILE_DIR]\output_csv\[INPUT_FILE_NAME]_[MSG_TYPE].csv

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
#!D:\ProgramFiles\miniconda3\envs\plot\python.exe
'''
example program that dumps a Mavlink log file. The log file is
assumed to be in the format that qgroundcontrol uses, which consists
of a series of MAVLink packets, each with a 64 bit timestamp
header. The timestamp is in microseconds since 1970 (unix epoch)
'''
from __future__ import print_function
 
import array
import fnmatch
import json
import os
import struct
import sys
import time
 
# Detect python version
if sys.version_info[0< 3:
    runningPython3 = False
else:
    runningPython3 = True
 
try:
    from pymavlink.mavextra import *
except:
    print("WARNING: Numpy missing, mathematical notation will not be supported..")
 
from argparse import ArgumentParser
# parser = ArgumentParser(description=__doc__)
 
# parser.add_argument("--no-timestamps", dest="notimestamps", action='store_true', help="Log doesn't have timestamps")
# parser.add_argument("--planner", action='store_true', help="use planner file format")
# parser.add_argument("--robust", action='store_true', help="Enable robust parsing (skip over bad data)")
# parser.add_argument("--condition", default=None, help="select packets by condition")
# parser.add_argument("-c", "--compress", action='store_true', help="Compress .mat file data")
# parser.add_argument("-f", "--follow", action='store_true', help="keep waiting for more data at end of file")
# parser.add_argument("-q", "--quiet", action='store_true', help="don't display packets")
# parser.add_argument("-o", "--output", default=None, help="output matching packets to give file")
# parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o")
# parser.add_argument("--format", default=None, help="Change the output format between 'standard', 'json', 'csv' and 'mat'. For the CSV output, you must supply types that you want. For MAT output, specify output file with --mat_file")
# parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv")
# parser.add_argument("--types", default=None, help="types of messages (comma separated with wildcard)")
# parser.add_argument("--nottypes", default=None, help="types of messages not to include (comma separated with wildcard)")
# parser.add_argument("--mat_file", dest="mat_file", help="Output file path for MATLAB file output. Only applies when --format=mat")
# parser.add_argument("--dialect", default="ardupilotmega", help="MAVLink dialect")
# parser.add_argument("--zero-time-base", action='store_true', help="use Z time base for DF logs")
# parser.add_argument("--no-bad-data", action='store_true', help="Don't output corrupted messages")
# parser.add_argument("--show-source", action='store_true', help="Show source system ID and component ID")
# parser.add_argument("--show-seq", action='store_true', help="Show sequence numbers")
# parser.add_argument("--show-types", action='store_true', help="Shows all message types available on opened log")
# parser.add_argument("--source-system", type=int, default=None, help="filter by source system ID")
# parser.add_argument("--source-component", type=int, default=None, help="filter by source component ID")
# parser.add_argument("--link", type=int, default=None, help="filter by comms link ID")
# parser.add_argument("--verbose", action='store_true', help="Dump messages in a much more verbose (but non-parseable) format")
# parser.add_argument("--mav10", action='store_true', help="parse as MAVLink1")
# parser.add_argument("--reduce", type=int, default=0, help="reduce streaming messages")
# parser.add_argument("--reduce-rate", type=float, default=0, help="reduce messages to maximum rate in Hz")
# parser.add_argument("log", metavar="LOG")
# parser.add_argument("--profile", action='store_true', help="run the Yappi python profiler")
# parser.add_argument("--meta", action='store_true', help="output meta-data msgs even if not matching condition")
 
def dump_bin_to_csv(input_file_path, type_name):
 
    file_dir = os.path.split(input_file_path)[0]
    file_name = os.path.split(input_file_path)[1][:-4]
    output_path = file_dir + '/output_csv/' + file_name + "_{}.csv".format(type_name)
    output_dir = file_dir + '/output_csv/'
    os.makedirs(output_dir, exist_ok=True)
 
    # print(file_dir)
    # print(file_name)
    # print(output_path)
 
    args_log                = input_file_path
    args_output             = output_path
    args_notimestamps       = False
    args_planner            = False
    args_robust             = False
    args_condition          = None
    args_format             = 'csv'
    args_csv_sep            = ","
    args_types              = type_name
    args_nottypes           = None
    args_mat_file           = None # output file path
    args_dialect            = "ardupilotmega"
    args_source_system      = None
    args_source_component   = None
    args_link               = None
    args_verbose            = False
    args_mav10              = False
    args_reduce             = 0
    args_reduce_rate        = 0.0
    args_profile            = False
    args_meta               = False
    args_compress           = False
    args_follow             = False
    args_quiet              = False
    args_parms              = False
    args_zero_time_base     = False
 
 
    if not args_mav10:
        os.environ['MAVLINK20'= '1'
 
    import inspect
 
    from pymavlink import mavutil
 
 
    # if args_profile:
    #     import yappi    # We do the import here so that we won't barf if run normally and yappi not available
    #     yappi.start()
 
    if args_format == 'mat':
        # Load these modules here, as they're only needed for MAT file creation
        import scipy.io
        import numpy as np
 
    filename = args_log
    mlog = mavutil.mavlink_connection(filename, planner_format=args_planner,
                                    notimestamps=args_notimestamps,
                                    robust_parsing=args_robust,
                                    dialect=args_dialect,
                                    zero_time_base=args_zero_time_base)
 
    output = None
    if args_output:
        output = open(args_output, mode='wb')
 
    types = args_types
    if types is not None:
        types = types.split(',')
 
    nottypes = args_nottypes
    if nottypes is not None:
        nottypes = nottypes.split(',')
 
    ext = os.path.splitext(filename)[1]
    isbin = ext in ['.bin''.BIN''.px4log']
    islog = ext in ['.log''.LOG'# NOTE: "islog" does not mean a tlog
    istlog = ext in ['.tlog''.TLOG']
 
    # list of msgs to reduce in rate when --reduce is used
    reduction_msgs = ['NKF*''XKF*''IMU*''AHR2''BAR*''ATT''BAT*''CTUN''NTUN''GP*''IMT*''MAG*''PL''POS''POW*''RATE''RC*''RFND''UBX*''VIBE''NKQ*''MOT*''CTRL''FTS*''DSF''CST*''LOS*''UWB*']
    reduction_yes = set()
    reduction_no = set()
    reduction_count = {}
 
    def reduce_msg(mtype, reduction_ratio):
        '''return True if this msg should be discarded by reduction'''
        global reduction_count, reduction_msgs, reduction_yes, reduction_no
        if mtype in reduction_no:
            return False
        if not mtype in reduction_yes:
            for m in reduction_msgs:
                if fnmatch.fnmatch(mtype, m):
                    reduction_yes.add(mtype)
                    reduction_count[mtype] = 0
                    break
            if not mtype in reduction_yes:
                reduction_no.add(mtype)
                return False
        reduction_count[mtype] += 1
        if reduction_count[mtype] == reduction_ratio:
            reduction_count[mtype] = 0
            return False
        return True
 
    last_msg_rate_t = {}
 
    def reduce_rate_msg(m, reduction_rate):
        '''return True if this msg should be discarded by reduction'''
        global last_msg_rate_t
        mtype = m.get_type()
        if mtype in ['PARM','MSG','FMT','FMTU','MULT','MODE','EVT']:
            return False
        t = getattr(m,'_timestamp',None)
        if t is None:
            return False
        if not mtype in last_msg_rate_t:
            last_msg_rate_t[mtype] = t
        dt = t - last_msg_rate_t[mtype]
 
        if dt < 0 or dt >= 1.0/reduction_rate:
            last_msg_rate_t[mtype] = t
            return False
        return True
 
    if args_csv_sep == "tab":
        args_csv_sep = ","
 
    # swiped from DFReader.py
    def to_string(s):
        '''desperate attempt to convert a string regardless of what garbage we get'''
        if isinstance(s, str):
            return s
        if sys.version_info[0== 2:
            # In python2 we want to return unicode for passed in unicode
            return s
        return s.decode(errors="backslashreplace")
 
    def match_type(mtype, patterns):
        '''return True if mtype matches pattern'''
        for p in patterns:
            if fnmatch.fnmatch(mtype, p):
                return True
        return False
 
    # Write out a header row as we're outputting in CSV format.
    fields = ['timestamp']
    offsets = {}
    if istlog and args_format == 'csv'# we know our fields from the get-go
        try:
            currentOffset = 1 # Store how many fields in we are for each message.
            for type_item in types:
                try:
                    typeClass = "MAVLink_{0}_message".format(type_item.lower())
                    if runningPython3:
                        fields += [type_item + '.' + x for x in inspect.getfullargspec(getattr(mavutil.mavlink, typeClass).__init__).args[1:]]
                    else:
                        fields += [type_item + '.' + x for x in inspect.getargspec(getattr(mavutil.mavlink, typeClass).__init__).args[1:]]
                    offsets[type_item] = currentOffset
                    currentOffset += len(fields)
                except IndexError:
                    quit()
        except TypeError:
            print("You must specify a list of message types if outputting CSV format via the --types argument.")
            exit()
 
        # The first line output are names for all columns
        csv_out = ["" for x in fields]
        # print(args_csv_sep.join(fields))
        line = args_csv_sep.join(fields) + "\n"
        output.write(line.encode())
 
    if isbin and args_format == 'csv'# need to accumulate columns from message
        if types is None or len(types) != 1:
            print("Need exactly one type when dumping CSV from bin file")
            quit()
 
    # Track the last timestamp value. Used for compressing data for the CSV output format.
    last_timestamp = None
 
    # Track types found
    available_types = set()
 
    # for DF logs pre-calculate types list
    match_types=None
    if types is not None and hasattr(mlog, 'name_to_id'):
        for k in mlog.name_to_id.keys():
            if match_type(k, types):
                if nottypes is not None and match_type(k, nottypes):
                    continue
                if match_types is None:
                    match_types = []
                match_types.append(k)
 
    if isbin and args_format == 'csv':
        # we need FMT messages for column headings
        match_types.append("FMT")
 
    # Keep track of data from the current timestep. If the following timestep has the same data, it's stored in here as well. Output should therefore have entirely unique timesteps.
    MAT = {}    # Dictionary to hold output data for 'mat' format option
    while True:
        m = mlog.recv_match(blocking=args_follow, type=match_types)
        if m is None:
            # write the final csv line before exiting
            if args_format == 'csv' and csv_out:
                csv_out[0= "{:.8f}".format(last_timestamp)
                line = args_csv_sep.join(csv_out) + "\n"
                output.write(line.encode())
            break
        m_type = m.get_type()
        available_types.add(m_type)
        if isbin and m_type == "FMT" and args_format == 'csv':
            if m.Name == types[0]:
                fields += m.Columns.split(',')
                csv_out = ["" for x in fields]
                line = args_csv_sep.join(fields) + "\n"
                output.write(line.encode())
 
        if args_reduce and reduce_msg(m_type, args_reduce):
            continue
 
        if args_reduce_rate > 0 and reduce_rate_msg(m, args_reduce_rate):
            continue
 
        # if output is not None:
        #     if (isbin or islog) and m_type == "FMT":
        #         output.write(m.get_msgbuf())
        #         continue
        #     if (isbin or islog) and (m_type == "PARM" and args_parms):
        #         output.write(m.get_msgbuf())
        #         continue
        #     if m_type == 'PARAM_VALUE' and args_parms:
        #         timestamp = getattr(m, '_timestamp', None)
        #         output.write(struct.pack('>Q', int(timestamp*1.0e6)) + m.get_msgbuf())
        #         continue
 
        if not mavutil.evaluate_condition(args_condition, mlog.messages) and (
                not (m_type in ['FMT''FMTU''MULT','PARM','MODE'and args_meta)):
            continue
        if args_source_system is not None and args_source_system != m.get_srcSystem():
            continue
        if args_source_component is not None and args_source_component != m.get_srcComponent():
            continue
        if args_link is not None and args_link != m._link:
            continue
 
        if types is not None and m_type != 'BAD_DATA' and not match_type(m_type, types):
            continue
 
        if nottypes is not None and match_type(m_type, nottypes):
            continue
 
        # Ignore BAD_DATA messages is the user requested or if they're because of a bad prefix. The
        # latter case is normally because of a mismatched MAVLink version.
        # if m_type == 'BAD_DATA' and (args_no_bad_data is True or m.reason == "Bad prefix"):
        #     continue
 
        # Grab the timestamp.
        timestamp = getattr(m, '_timestamp'0.0)
 
        # If we're just logging, pack in the timestamp and data into the output file.
        # if output:
        #     if not (isbin or islog):
        #         output.write(struct.pack('>Q', int(timestamp*1.0e6)))
        #     try:
        #         output.write(m.get_msgbuf())
        #     except Exception as ex:
        #         print("Failed to write msg %s: %s" % (m_type, str(ex)))
 
        # If quiet is specified, don't display output to the terminal.
        if args_quiet:
            continue
 
        # If JSON was ordered, serve it up. Split it nicely into metadata and data.
        if args_format == 'json':
            # Format our message as a Python dict, which gets us almost to proper JSON format
            data = m.to_dict()
 
            # Remove the mavpackettype value as we specify that later.
            del data['mavpackettype']
 
            # Also, if it's a BAD_DATA message, make it JSON-compatible by removing array objects
            if 'data' in data and type(data['data']) is not dict:
                data['data'= list(data['data'])
 
            # Prepare the message as a single object with 'meta' and 'data' keys holding
            # the message's metadata and actual data respectively.
            meta = {"type": m_type, "timestamp": timestamp}
            # if args_show_source:
            #     meta["srcSystem"] = m.get_srcSystem()
            #     meta["srcComponent"] = m.get_srcComponent()
 
            # convert any array.array (e.g. packed-16-bit fft readings) into lists:
            for key in data.keys():
                if type(data[key]) == array.array:
                    data[key] = list(data[key])
            # convert any byte-strings into utf-8 strings.  Don't die trying.
            for key in data.keys():
                if type(data[key]) == bytes:
                    data[key] = to_string(data[key])
            outMsg = {"meta": meta, "data": data}
 
            # Now print out this object with stringified properly.
            print(json.dumps(outMsg))
 
        # CSV format outputs columnar data with a user-specified delimiter
        elif args_format == 'csv':
            data = m.to_dict()
 
            # If this message has a duplicate timestamp, copy its data into the existing data list. Also
            # do this if it's the first message encountered.
            if timestamp == last_timestamp or last_timestamp is None:
                if isbin:
                    newData = [str(data[y]) if y != "timestamp" else "" for y in fields]
                else:
                    newData = [str(data[y.split('.')[-1]]) if y.split('.')[0== m_type and y.split('.')[-1in data else "" for y in fields]
 
                for i, val in enumerate(newData):
                    if val:
                        csv_out[i] = val
 
            # Otherwise if this is a new timestamp, print out the old output data, and store the current message for later output.
            else:
                csv_out[0= "{:.8f}".format(last_timestamp)
                # print(args_csv_sep.join(csv_out))
                line = args_csv_sep.join(csv_out) + "\n"
                output.write(line.encode())
                if isbin:
                    csv_out = [str(data[y]) if y != "timestamp" else "" for y in fields]
                else:
                    csv_out = [str(data[y.split('.')[-1]]) if y.split('.')[0== m_type and y.split('.')[-1in data else "" for y in fields]
        # MAT format outputs data to a .mat file specified through the
        # --mat_file option
        elif args_format == 'mat':
            # If this packet contains data (i.e. is not a FMT
            # packet), append the data in this packet to the
            # corresponding list
            if m_type != 'FMT':
 
                # If this packet type has not yet been
                # seen, add a new entry to the big dict
                if m_type not in MAT:
                    MAT[m_type] = {}
 
                md = m.to_dict()
                del md['mavpackettype']
                cols = md.keys()
                for col in cols:
                    # If this column hasn't had data entered,
                    # make a new key and list
                    if col in MAT[m_type]:
                        MAT[m_type][col].append(md[col])
                    else:
                        MAT[m_type][col] = [md[col]]
        # elif args_show_types:
        #     # do nothing
        #     pass
        elif args_verbose and istlog:
            mavutil.dump_message_verbose(sys.stdout, m)
            print("")
        else:
            # Otherwise we output in a standard Python dict-style format
            s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S",
                                            time.localtime(timestamp)),
                                int(timestamp*100.0)%100, m)
            # if args_show_source:
            #     s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent())
            # if args_show_seq:
            #     s += " seq=%u" % m.get_seq()
            print(s)
 
        # Update our last timestamp value.
        last_timestamp = timestamp
 
    # Export the .mat file
    if args_format == 'mat':
        scipy.io.savemat(args_mat_file, MAT, do_compression=args_compress)
 
    # if args_show_types:
    #     for msgType in available_types:
    #         print(msgType)
 
    # if args_profile:
    #     yappi.get_func_stats().print_all()
    #     yappi.get_thread_stats().print_all()
 
if __name__ == "__main__":
 
    input = "somewhere/FLCC_LogFile.BIN"
    output = "somewhere_you_want_to_save"
    type_list = ['ATT''MAG''IOMC']
    # type_list = ['ATT']
    for type_name in type_list:
        dump_bin_to_csv(input, type_name)
cs

** EOF **

728x90

+ Recent posts