network.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # The Qubes OS Project, http://www.qubes-os.org
  5. #
  6. # Copyright (C) 2015 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU General Public License
  10. # as published by the Free Software Foundation; either version 2
  11. # of the License, or (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License
  19. # along with this program; if not, write to the Free Software
  20. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
  21. # USA.
  22. #
  23. import multiprocessing
  24. import os
  25. import subprocess
  26. import unittest
  27. import time
  28. from qubes.qubes import QubesVmCollection, defaults
  29. VM_PREFIX = "test-"
  30. class VmNetworkingTests(unittest.TestCase):
  31. ping_ip = "ping -W 1 -n -c 1 192.168.123.45"
  32. ping_name = "ping -W 1 -c 1 test.example.com"
  33. def run_cmd(self, vm, cmd, user="root"):
  34. p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
  35. p.stdin.close()
  36. p.stdout.read()
  37. return p.wait()
  38. def setUp(self):
  39. self.qc = QubesVmCollection()
  40. self.qc.lock_db_for_writing()
  41. self.qc.load()
  42. self.testnetvm = self.qc.add_new_vm("QubesNetVm",
  43. name="%snetvm1" % VM_PREFIX,
  44. template=self.qc.get_default_template())
  45. self.testnetvm.create_on_disk(verbose=False)
  46. self.testvm1 = self.qc.add_new_vm("QubesAppVm",
  47. name="%svm2" % VM_PREFIX,
  48. template=self.qc.get_default_template())
  49. self.testvm1.create_on_disk(verbose=False)
  50. self.testvm1.netvm = self.testnetvm
  51. self.qc.save()
  52. self.qc.unlock_db()
  53. self.configure_netvm()
  54. def configure_netvm(self):
  55. def run_netvm_cmd(cmd):
  56. if self.run_cmd(self.testnetvm, cmd) != 0:
  57. self.fail("Command '%s' failed" % cmd)
  58. if not self.testnetvm.is_running():
  59. self.testnetvm.start()
  60. # Ensure that dnsmasq is installed:
  61. p = self.testnetvm.run("dnsmasq --version", user="root",
  62. passio_popen=True)
  63. if p.wait() != 0:
  64. self.skipTest("dnsmasq not installed")
  65. run_netvm_cmd("ip link add test0 type dummy")
  66. run_netvm_cmd("ip link set test0 up")
  67. run_netvm_cmd("ip addr add 192.168.123.45/24 dev test0")
  68. run_netvm_cmd("iptables -I INPUT -d 192.168.123.45 -j ACCEPT")
  69. run_netvm_cmd("dnsmasq -a 192.168.123.45 -A /example.com/192.168.123.45 -i test0 -z")
  70. run_netvm_cmd("echo nameserver 192.168.123.45 > /etc/resolv.conf")
  71. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  72. def remove_vms(self, vms):
  73. self.qc.lock_db_for_writing()
  74. self.qc.load()
  75. for vm in vms:
  76. if isinstance(vm, str):
  77. vm = self.qc.get_vm_by_name(vm)
  78. else:
  79. vm = self.qc[vm.qid]
  80. if vm.is_running():
  81. try:
  82. vm.force_shutdown()
  83. except:
  84. pass
  85. try:
  86. vm.remove_from_disk()
  87. except OSError:
  88. pass
  89. self.qc.pop(vm.qid)
  90. self.qc.save()
  91. self.qc.unlock_db()
  92. def tearDown(self):
  93. vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
  94. VM_PREFIX)]
  95. self.remove_vms(vmlist)
  96. def test_000_simple_networking(self):
  97. self.testvm1.start()
  98. self.assertEqual(self.run_cmd(self.testvm1,
  99. "ping -c 1 192.168.123.45"), 0)
  100. self.assertEqual(self.run_cmd(self.testvm1,
  101. "ping -c 1 test.example.com"), 0)
  102. def test_010_simple_proxyvm(self):
  103. self.qc.lock_db_for_writing()
  104. self.qc.load()
  105. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  106. name="%sproxy" % VM_PREFIX,
  107. template=self.qc.get_default_template())
  108. self.proxy.create_on_disk(verbose=False)
  109. self.proxy.netvm = self.testnetvm
  110. self.testvm1.netvm = self.proxy
  111. self.qc.save()
  112. self.qc.unlock_db()
  113. self.testvm1.start()
  114. self.assertTrue(self.proxy.is_running())
  115. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  116. "Ping by IP from ProxyVM failed")
  117. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  118. "Ping by name from ProxyVM failed")
  119. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  120. "Ping by IP from AppVM failed")
  121. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  122. "Ping by IP from AppVM failed")
  123. def test_020_simple_proxyvm_nm(self):
  124. self.qc.lock_db_for_writing()
  125. self.qc.load()
  126. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  127. name="%sproxy" % VM_PREFIX,
  128. template=self.qc.get_default_template())
  129. self.proxy.create_on_disk(verbose=False)
  130. self.proxy.netvm = self.testnetvm
  131. self.proxy.services['network-manager'] = True
  132. self.testvm1.netvm = self.proxy
  133. self.qc.save()
  134. self.qc.unlock_db()
  135. self.testvm1.start()
  136. self.assertTrue(self.proxy.is_running())
  137. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  138. "Ping by IP failed")
  139. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  140. "Ping by name failed")
  141. # reconnect to make sure that device was configured by NM
  142. self.assertEqual(
  143. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  144. user="user"),
  145. 0, "Failed to disconnect eth0 using nmcli")
  146. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 1,
  147. "Network should be disabled, but apparently it isn't")
  148. self.assertEqual(
  149. self.run_cmd(self.proxy, "nmcli connection up \"VM uplink eth0\" "
  150. "ifname eth0",
  151. user="user"),
  152. 0, "Failed to connect eth0 using nmcli")
  153. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  154. "Failed to wait for NM connection")
  155. # check for nm-applet presence
  156. self.assertEqual(subprocess.call([
  157. 'xdotool', 'search', '--all', '--name',
  158. '--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name)
  159. ], stdout=open('/dev/null', 'w')), 0, "nm-applet window not found")
  160. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  161. "Ping by IP failed (after NM reconnection")
  162. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  163. "Ping by name failed (after NM reconnection)")
  164. def test_030_firewallvm_firewall(self):
  165. self.qc.lock_db_for_writing()
  166. self.qc.load()
  167. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  168. name="%sproxy" % VM_PREFIX,
  169. template=self.qc.get_default_template())
  170. self.proxy.create_on_disk(verbose=False)
  171. self.proxy.netvm = self.testnetvm
  172. self.testvm1.netvm = self.proxy
  173. self.qc.save()
  174. self.qc.unlock_db()
  175. # block all for first
  176. self.testvm1.write_firewall_conf({
  177. 'allow': False,
  178. 'allowDns': False,
  179. 'allowIcmp': False,
  180. })
  181. self.testvm1.start()
  182. self.assertTrue(self.proxy.is_running())
  183. self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
  184. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  185. "Ping by IP from ProxyVM failed")
  186. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  187. "Ping by name from ProxyVM failed")
  188. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 1,
  189. "Ping by IP should be blocked")
  190. nc_cmd = "nc -w 1 --recv-only 192.168.123.45 1234"
  191. self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 1,
  192. "TCP connection should be blocked")
  193. # block all except ICMP
  194. self.testvm1.write_firewall_conf({
  195. 'allow': False,
  196. 'allowDns': False,
  197. 'allowIcmp': True,
  198. })
  199. self.proxy.write_iptables_xenstore_entry()
  200. # Ugly hack b/c there is no feedback when the rules are actually applied
  201. time.sleep(1)
  202. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  203. "Ping by IP failed (should be allowed now)")
  204. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 2,
  205. "Ping by name should be blocked")
  206. # all TCP still blocked
  207. self.testvm1.write_firewall_conf({
  208. 'allow': False,
  209. 'allowDns': True,
  210. 'allowIcmp': True,
  211. })
  212. self.proxy.write_iptables_xenstore_entry()
  213. # Ugly hack b/c there is no feedback when the rules are actually applied
  214. time.sleep(1)
  215. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  216. "Ping by name failed (should be allowed now)")
  217. self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 1,
  218. "TCP connection should be blocked")
  219. # block all except target
  220. self.testvm1.write_firewall_conf({
  221. 'allow': False,
  222. 'allowDns': True,
  223. 'allowIcmp': True,
  224. 'rules': [{'address': '192.168.123.45',
  225. 'netmask': 32,
  226. 'proto': 'tcp',
  227. 'portBegin': 1234
  228. }] })
  229. self.proxy.write_iptables_xenstore_entry()
  230. # Ugly hack b/c there is no feedback when the rules are actually applied
  231. time.sleep(1)
  232. self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
  233. "TCP connection failed (should be allowed now)")
  234. # allow all except target
  235. self.testvm1.write_firewall_conf({
  236. 'allow': True,
  237. 'allowDns': True,
  238. 'allowIcmp': True,
  239. 'rules': [{'address': '192.168.123.45',
  240. 'netmask': 32,
  241. 'proto': 'tcp',
  242. 'portBegin': 1234
  243. }]
  244. })
  245. self.proxy.write_iptables_xenstore_entry()
  246. # Ugly hack b/c there is no feedback when the rules are actually applied
  247. time.sleep(1)
  248. self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 1,
  249. "TCP connection should be blocked")
  250. def test_040_inter_vm(self):
  251. self.qc.lock_db_for_writing()
  252. self.qc.load()
  253. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  254. name="%sproxy" % VM_PREFIX,
  255. template=self.qc.get_default_template())
  256. self.proxy.create_on_disk(verbose=False)
  257. self.proxy.netvm = self.testnetvm
  258. self.testvm1.netvm = self.proxy
  259. self.testvm2 = self.qc.add_new_vm("QubesAppVm",
  260. name="%svm3" % VM_PREFIX,
  261. template=self.qc.get_default_template())
  262. self.testvm2.create_on_disk(verbose=False)
  263. self.testvm2.netvm = self.proxy
  264. self.qc.save()
  265. self.qc.unlock_db()
  266. self.testvm1.start()
  267. self.testvm2.start()
  268. self.assertEqual(self.run_cmd(self.testvm1,
  269. "ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 1)
  270. self.testvm2.netvm = self.testnetvm
  271. self.assertEqual(self.run_cmd(self.testvm1,
  272. "ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 1)
  273. self.assertEqual(self.run_cmd(self.testvm2,
  274. "ping -W 1 -n -c 1 {}".format(self.testvm1.ip)), 1)
  275. self.testvm1.netvm = self.testnetvm
  276. self.assertEqual(self.run_cmd(self.testvm1,
  277. "ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 1)
  278. self.assertEqual(self.run_cmd(self.testvm2,
  279. "ping -W 1 -n -c 1 {}".format(self.testvm1.ip)), 1)