network.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. from distutils import spawn
  25. import multiprocessing
  26. import os
  27. import subprocess
  28. import unittest
  29. import time
  30. from qubes.qubes import QubesVmCollection, defaults
  31. import qubes.tests
  32. class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
  33. test_ip = '192.168.123.45'
  34. test_name = 'test.example.com'
  35. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  36. ping_ip = ping_cmd.format(target=test_ip)
  37. ping_name = ping_cmd.format(target=test_name)
  38. def run_cmd(self, vm, cmd, user="root"):
  39. p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
  40. p.stdin.close()
  41. p.stdout.read()
  42. return p.wait()
  43. def setUp(self):
  44. super(VmNetworkingMixin, self).setUp()
  45. self.testnetvm = self.qc.add_new_vm("QubesNetVm",
  46. name=self.make_vm_name('netvm1'),
  47. template=self.qc.get_vm_by_name(self.template))
  48. self.testnetvm.create_on_disk(verbose=False)
  49. self.testvm1 = self.qc.add_new_vm("QubesAppVm",
  50. name=self.make_vm_name('vm2'),
  51. template=self.qc.get_vm_by_name(self.template))
  52. self.testvm1.create_on_disk(verbose=False)
  53. self.testvm1.netvm = self.testnetvm
  54. self.qc.save()
  55. self.configure_netvm()
  56. def configure_netvm(self):
  57. def run_netvm_cmd(cmd):
  58. if self.run_cmd(self.testnetvm, cmd) != 0:
  59. self.fail("Command '%s' failed" % cmd)
  60. if not self.testnetvm.is_running():
  61. self.testnetvm.start()
  62. # Ensure that dnsmasq is installed:
  63. p = self.testnetvm.run("dnsmasq --version", user="root",
  64. passio_popen=True)
  65. if p.wait() != 0:
  66. self.skipTest("dnsmasq not installed")
  67. run_netvm_cmd("ip link add test0 type dummy")
  68. run_netvm_cmd("ip link set test0 up")
  69. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  70. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT".format(self.test_ip))
  71. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  72. ip=self.test_ip, name=self.test_name))
  73. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  74. self.test_ip))
  75. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  76. def test_000_simple_networking(self):
  77. self.qc.unlock_db()
  78. self.testvm1.start()
  79. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  80. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  81. def test_010_simple_proxyvm(self):
  82. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  83. name=self.make_vm_name('proxy'),
  84. template=self.qc.get_vm_by_name(self.template))
  85. self.proxy.create_on_disk(verbose=False)
  86. self.proxy.netvm = self.testnetvm
  87. self.testvm1.netvm = self.proxy
  88. self.qc.save()
  89. self.qc.unlock_db()
  90. self.testvm1.start()
  91. self.assertTrue(self.proxy.is_running())
  92. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  93. "Ping by IP from ProxyVM failed")
  94. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  95. "Ping by name from ProxyVM failed")
  96. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  97. "Ping by IP from AppVM failed")
  98. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  99. "Ping by IP from AppVM failed")
  100. @unittest.skipUnless(spawn.find_executable('xdotool'),
  101. "xdotool not installed")
  102. def test_020_simple_proxyvm_nm(self):
  103. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  104. name=self.make_vm_name('proxy'),
  105. template=self.qc.get_vm_by_name(self.template))
  106. self.proxy.create_on_disk(verbose=False)
  107. self.proxy.netvm = self.testnetvm
  108. self.proxy.services['network-manager'] = True
  109. self.testvm1.netvm = self.proxy
  110. self.qc.save()
  111. self.qc.unlock_db()
  112. self.testvm1.start()
  113. self.assertTrue(self.proxy.is_running())
  114. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  115. "Ping by IP failed")
  116. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  117. "Ping by name failed")
  118. # reconnect to make sure that device was configured by NM
  119. self.assertEqual(
  120. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  121. user="user"),
  122. 0, "Failed to disconnect eth0 using nmcli")
  123. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  124. "Network should be disabled, but apparently it isn't")
  125. self.assertEqual(
  126. self.run_cmd(self.proxy,
  127. 'nmcli connection up "VM uplink eth0" ifname eth0',
  128. user="user"),
  129. 0, "Failed to connect eth0 using nmcli")
  130. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  131. "Failed to wait for NM connection")
  132. # check for nm-applet presence
  133. self.assertEqual(subprocess.call([
  134. 'xdotool', 'search', '--all', '--name',
  135. '--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name)],
  136. stdout=open('/dev/null', 'w')), 0, "nm-applet window not found")
  137. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  138. "Ping by IP failed (after NM reconnection")
  139. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  140. "Ping by name failed (after NM reconnection)")
  141. def test_030_firewallvm_firewall(self):
  142. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  143. name=self.make_vm_name('proxy'),
  144. template=self.qc.get_vm_by_name(self.template))
  145. self.proxy.create_on_disk(verbose=False)
  146. self.proxy.netvm = self.testnetvm
  147. self.testvm1.netvm = self.proxy
  148. self.qc.save()
  149. self.qc.unlock_db()
  150. # block all for first
  151. self.testvm1.write_firewall_conf({
  152. 'allow': False,
  153. 'allowDns': False,
  154. 'allowIcmp': False,
  155. })
  156. self.testvm1.start()
  157. self.assertTrue(self.proxy.is_running())
  158. self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
  159. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  160. "Ping by IP from ProxyVM failed")
  161. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  162. "Ping by name from ProxyVM failed")
  163. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  164. "Ping by IP should be blocked")
  165. nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
  166. self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
  167. "TCP connection should be blocked")
  168. # block all except ICMP
  169. self.testvm1.write_firewall_conf({
  170. 'allow': False,
  171. 'allowDns': False,
  172. 'allowIcmp': True,
  173. })
  174. self.proxy.write_iptables_qubesdb_entry()
  175. # Ugly hack b/c there is no feedback when the rules are actually applied
  176. time.sleep(3)
  177. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  178. "Ping by IP failed (should be allowed now)")
  179. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  180. "Ping by name should be blocked")
  181. # all TCP still blocked
  182. self.testvm1.write_firewall_conf({
  183. 'allow': False,
  184. 'allowDns': True,
  185. 'allowIcmp': True,
  186. })
  187. self.proxy.write_iptables_qubesdb_entry()
  188. # Ugly hack b/c there is no feedback when the rules are actually applied
  189. time.sleep(3)
  190. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  191. "Ping by name failed (should be allowed now)")
  192. self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
  193. "TCP connection should be blocked")
  194. # block all except target
  195. self.testvm1.write_firewall_conf({
  196. 'allow': False,
  197. 'allowDns': True,
  198. 'allowIcmp': True,
  199. 'rules': [{'address': self.test_ip,
  200. 'netmask': 32,
  201. 'proto': 'tcp',
  202. 'portBegin': 1234
  203. }] })
  204. self.proxy.write_iptables_qubesdb_entry()
  205. # Ugly hack b/c there is no feedback when the rules are actually applied
  206. time.sleep(3)
  207. self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
  208. "TCP connection failed (should be allowed now)")
  209. # allow all except target
  210. self.testvm1.write_firewall_conf({
  211. 'allow': True,
  212. 'allowDns': True,
  213. 'allowIcmp': True,
  214. 'rules': [{'address': self.test_ip,
  215. 'netmask': 32,
  216. 'proto': 'tcp',
  217. 'portBegin': 1234
  218. }]
  219. })
  220. self.proxy.write_iptables_qubesdb_entry()
  221. # Ugly hack b/c there is no feedback when the rules are actually applied
  222. time.sleep(3)
  223. self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
  224. "TCP connection should be blocked")
  225. def test_040_inter_vm(self):
  226. self.proxy = self.qc.add_new_vm("QubesProxyVm",
  227. name=self.make_vm_name('proxy'),
  228. template=self.qc.get_vm_by_name(self.template))
  229. self.proxy.create_on_disk(verbose=False)
  230. self.proxy.netvm = self.testnetvm
  231. self.testvm1.netvm = self.proxy
  232. self.testvm2 = self.qc.add_new_vm("QubesAppVm",
  233. name=self.make_vm_name('vm3'),
  234. template=self.qc.get_vm_by_name(self.template))
  235. self.testvm2.create_on_disk(verbose=False)
  236. self.testvm2.netvm = self.proxy
  237. self.qc.save()
  238. self.qc.unlock_db()
  239. self.testvm1.start()
  240. self.testvm2.start()
  241. self.assertNotEqual(self.run_cmd(self.testvm1,
  242. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  243. self.testvm2.netvm = self.testnetvm
  244. self.assertNotEqual(self.run_cmd(self.testvm1,
  245. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  246. self.assertNotEqual(self.run_cmd(self.testvm2,
  247. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  248. self.testvm1.netvm = self.testnetvm
  249. self.assertNotEqual(self.run_cmd(self.testvm1,
  250. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  251. self.assertNotEqual(self.run_cmd(self.testvm2,
  252. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  253. def load_tests(loader, tests, pattern):
  254. try:
  255. qc = qubes.qubes.QubesVmCollection()
  256. qc.lock_db_for_reading()
  257. qc.load()
  258. qc.unlock_db()
  259. templates = [vm.name for vm in qc.values() if
  260. isinstance(vm, qubes.qubes.QubesTemplateVm)]
  261. except OSError:
  262. templates = []
  263. for template in templates:
  264. tests.addTests(loader.loadTestsFromTestCase(
  265. type(
  266. 'VmNetworking_' + template,
  267. (VmNetworkingMixin, qubes.tests.QubesTestCase),
  268. {'template': template})))
  269. return tests