decode-file.py
3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
"""
This file demonstrates how to use sherpa-onnx Python API to recognize
a single file.
Please refer to
https://k2-fsa.github.io/sherpa/onnx/index.html
to install sherpa-onnx and to download the pre-trained models
used in this file.
"""
import argparse
import time
import wave
from pathlib import Path
import numpy as np
import sherpa_onnx
def assert_file_exists(filename: str):
assert Path(
filename
).is_file(), f"{filename} does not exist!\nPlease refer to https://k2-fsa.github.io/sherpa/onnx/pretrained_models/index.html to download it"
def get_args():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--tokens",
type=str,
help="Path to tokens.txt",
)
parser.add_argument(
"--encoder",
type=str,
help="Path to the encoder model",
)
parser.add_argument(
"--decoder",
type=str,
help="Path to the decoder model",
)
parser.add_argument(
"--joiner",
type=str,
help="Path to the joiner model",
)
parser.add_argument(
"--num-threads",
type=int,
default=1,
help="Number of threads for neural network computation",
)
parser.add_argument(
"--decoding-method",
type=str,
default="greedy_search",
help="Valid values are greedy_search and modified_beam_search",
)
parser.add_argument(
"--wave-filename",
type=str,
help="""Path to the wave filename. Must be 16 kHz,
mono with 16-bit samples""",
)
return parser.parse_args()
def main():
sample_rate = 16000
args = get_args()
assert_file_exists(args.encoder)
assert_file_exists(args.decoder)
assert_file_exists(args.joiner)
assert_file_exists(args.tokens)
if not Path(args.wave_filename).is_file():
print(f"{args.wave_filename} does not exist!")
return
recognizer = sherpa_onnx.OnlineRecognizer(
tokens=args.tokens,
encoder=args.encoder,
decoder=args.decoder,
joiner=args.joiner,
num_threads=args.num_threads,
sample_rate=sample_rate,
feature_dim=80,
decoding_method=args.decoding_method,
)
with wave.open(args.wave_filename) as f:
assert f.getframerate() == sample_rate, f.getframerate()
assert f.getnchannels() == 1, f.getnchannels()
assert f.getsampwidth() == 2, f.getsampwidth() # it is in bytes
num_samples = f.getnframes()
samples = f.readframes(num_samples)
samples_int16 = np.frombuffer(samples, dtype=np.int16)
samples_float32 = samples_int16.astype(np.float32)
samples_float32 = samples_float32 / 32768
duration = len(samples_float32) / sample_rate
start_time = time.time()
print("Started!")
stream = recognizer.create_stream()
stream.accept_waveform(sample_rate, samples_float32)
tail_paddings = np.zeros(int(0.2 * sample_rate), dtype=np.float32)
stream.accept_waveform(sample_rate, tail_paddings)
stream.input_finished()
while recognizer.is_ready(stream):
recognizer.decode_stream(stream)
print(recognizer.get_result(stream))
print("Done!")
end_time = time.time()
elapsed_seconds = end_time - start_time
rtf = elapsed_seconds / duration
print(f"num_threads: {args.num_threads}")
print(f"decoding_method: {args.decoding_method}")
print(f"Wave duration: {duration:.3f} s")
print(f"Elapsed time: {elapsed_seconds:.3f} s")
print(f"Real time factor (RTF): {elapsed_seconds:.3f}/{duration:.3f} = {rtf:.3f}")
if __name__ == "__main__":
main()