1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
| # mitm_test.py
"""
Do not edit this file. Changing it may cause your solution to fail on the autograder.
Implement your solution in `mitm.py`
"""
import math
import sys
import threading
import queue
import random
import hashlib
# import the mitm.py file
from mitm import mitm
## The actual key-exchange in the autograder will also use the same modulus and base parameters
MODULUS = 12345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012345679012281
BASE = 4384165182867240584805930970951575013697
SECRET_FLAG = "placeholder flag"
# This is the first user in the key-exchange
def alice():
send, recv = bind(own_name="alice", remote_name="bob") # 绑定两个用户
"""Complete a Diffie-Hellman key exchange"""
print("[Alice] starting key exchange")
# Start by generating a secret
ALICE_SECRET = random.getrandbits(128) # 生成随机数
# print(f"[Alice] secret: {ALICE_SECRET}")
# Compute (base)^(secret) mod (modulus) and send it
ALICE_SHARED = pow(BASE, ALICE_SECRET, MODULUS) # 计算公钥 = (base)^(secret) mod (modulus)
send(str(ALICE_SHARED)) # 发送公钥
# print(f"[Alice] shared: {ALICE_SHARED}")
# Receive the other party's shared value
BOB_SHARED = int(recv()) # 接收公钥
# print(f"[Alice] received: {BOB_SHARED}")
# Compute the shared secret
SHARED_SECRET = pow(BOB_SHARED, ALICE_SECRET, MODULUS) # 计算Alice 的共享密钥 = (公钥)^(secret) mod (modulus)
# Now, SHARED_SECRET is a shared encryption key between the two parties.
# send some data between the two parties to ensure data is being correctly transmitted
print("[Alice] starting handshake...")
send(encrypt("secret message 1", SHARED_SECRET))
assert decrypt(recv(), SHARED_SECRET) == "secret message 2" # 断言 如果解密所接受的数据解密后不为"secret message 2" 则报错
# 个人猜测断言是为了防止中间人篡改和验证加密解密是否正常 不过预期数据存在server端 或许可以当蜜罐?
send(encrypt("secret message 3", SHARED_SECRET))
send(encrypt("secret message 4", SHARED_SECRET))
assert decrypt(recv(), SHARED_SECRET) == "secret message 5"
assert decrypt(recv(), SHARED_SECRET) == "secret message 6"
print("[Alice] sending flag...")
# send the secret flag
send(encrypt(f"Here is your flag {SECRET_FLAG} :)", SHARED_SECRET))
send(encrypt("goodbye", SHARED_SECRET))
print("[Alice] goodbye")
# This is the second user in the key-exchange
def bob():
send, recv = bind(own_name="bob", remote_name="alice")
"""Complete a Diffie-Hellman key exchange"""
print("[Bob] starting key exchange")
# Start by generating a secret
BOB_SECRET = random.getrandbits(128)
# Compute (base)^(secret) mod (modulus) and send it
BOB_SHARED = pow(BASE, BOB_SECRET, MODULUS) # Bob的 公钥
send(str(BOB_SHARED))
# Receive the other party's shared value
ALICE_SHARED = int(recv())
# Compute the shared secret
SHARED_SECRET = pow(ALICE_SHARED, BOB_SECRET, MODULUS) # Bob's 共享密钥
# Now, SHARED_SECRET is a shared encryption key between the two parties.
# send some data between the two parties to ensure data is being correctly transmitted
print("[Bob] starting handshake...")
send(encrypt("secret message 2", SHARED_SECRET))
assert decrypt(recv(), SHARED_SECRET) == "secret message 1"
send(encrypt("secret message 5", SHARED_SECRET))
send(encrypt("secret message 6", SHARED_SECRET))
assert decrypt(recv(), SHARED_SECRET) == "secret message 3"
assert decrypt(recv(), SHARED_SECRET) == "secret message 4"
print("[Bob] waiting for flag...")
__FLAG = decrypt(recv(), SHARED_SECRET)
assert decrypt(recv(), SHARED_SECRET) == "goodbye"
print("[Bob] goodbye")
"""
Encrypt the message using the given numeric key, returning a string
This function may be useful as part of your attack.
"""
def encrypt(msg, key):
nonce = random.randint(int(1e10), int(1e11)) # 盐
enc_key = hashlib.sha512((str(nonce) + str(key)).encode()).digest() # 加盐密钥 = sha512(盐+密钥)
msg = msg.encode() # 转换为字节
msg_enc = bytes([msg[i] ^ enc_key[i % len(enc_key)] for i in range(len(msg))]) # 加密 = 明文 ^ 加盐密钥
return str(
nonce) + "+" + msg_enc.hex() # nonce is used to ensure that the same message encrypted with the same key will not be the same
"""
Decrypt the message using the given numeric key, returning a string
This function may be useful as part of your attack.
"""
def decrypt(msg_enc, key):
nonce = int(msg_enc.split("+")[0])
msg_enc = bytes.fromhex(msg_enc.split("+")[1])
enc_key = hashlib.sha512((str(nonce) + str(key)).encode()).digest()
msg = [msg_enc[i] ^ enc_key[i % len(enc_key)] for i in range(len(msg_enc))]
msg = bytes([x if x < 128 else ord(" ") for x in msg]).decode()
return msg
"""
The code beyond this point is just the mechanics of the message passing through
the MiTM. It should not affect your solution.
"""
queues = {"alice": queue.Queue(), "bob": queue.Queue()}
mitm_queues = {"alice": queue.Queue(), "bob": queue.Queue()}
def send_internal(msg, src_name, dst_name):
assert src_name in mitm_queues
mitm_queues[src_name].put(msg)
def recv_blocking_internal(own_name):
return queues[own_name].get(block=True, timeout=None)
def bind(own_name, remote_name):
def send(msg): send_internal(msg, own_name, remote_name)
def recv(): return recv_blocking_internal(own_name)[1]
return send, recv
if __name__ == "__main__":
alice_thread = threading.Thread(name="alice_thread", target=alice) # 创建线程
bob_thread = threading.Thread(name="bob_thread", target=bob)
alice_thread.setDaemon(True) # 守护
bob_thread.setDaemon(True)
alice_thread.start() # 开始
bob_thread.start()
mitm(
lambda msg: queues["alice"].put(("bob", msg)) if msg is not None else None, # 如果msg不为空则将msg放入队列
lambda msg: queues["bob"].put(("alice", msg)) if msg is not None else None,
lambda: None if mitm_queues["alice"].empty() else mitm_queues["alice"].get(), # 如果队列为空则返回None
lambda: None if mitm_queues["bob"].empty() else mitm_queues["bob"].get(),
)
alice_thread.join() # 等待线程结束 join()方法会阻塞调用线程,直到被调用线程结束为止
bob_thread.join()
|